diff --git a/server/src/internalClusterTest/java/org/elasticsearch/common/network/ThreadWatchdogIT.java b/server/src/internalClusterTest/java/org/elasticsearch/common/network/ThreadWatchdogIT.java index ffe55387d34c6..d13249a9bfe5b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/common/network/ThreadWatchdogIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/common/network/ThreadWatchdogIT.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.Request; import org.elasticsearch.client.internal.node.NodeClient; @@ -137,7 +138,7 @@ public void testThreadWatchdogTransportLogging() { EmptyRequest::new, (request, channel, task) -> { blockAndWaitForWatchdogLogs(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } ); @@ -149,7 +150,7 @@ public void testThreadWatchdogTransportLogging() { new EmptyRequest(), new ActionListenerResponseHandler( l, - in -> TransportResponse.Empty.INSTANCE, + in -> ActionResponse.Empty.INSTANCE, EsExecutors.DIRECT_EXECUTOR_SERVICE ) ) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index ccbd3b823da4b..fc77daafac12a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; @@ -161,7 +162,7 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, in -> TransportResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER) + new ActionListenerResponseHandler<>(listener, in -> ActionResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER) ); } @@ -418,14 +419,14 @@ public static void registerRequestHandler(TransportService transportService, Sea ClearScrollContextsRequest::new, (request, channel, task) -> { searchService.freeAllScrollContexts(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } ); TransportActionProxy.registerProxyAction( transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, false, - (in) -> TransportResponse.Empty.INSTANCE + (in) -> ActionResponse.Empty.INSTANCE ); transportService.registerRequestHandler( diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 9e5194dfc479f..f6e2a3283898c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.cluster.ClusterState; @@ -57,7 +58,6 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -291,7 +291,7 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel, logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure); taskQueue.submitTask( "shard-failed " + request.toStringNoFailureStackTrace(), - new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), + new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> ActionResponse.Empty.INSTANCE)), null ); } @@ -602,7 +602,7 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel, logger.debug("{} received shard started for [{}]", request.shardId, request); taskQueue.submitTask( "shard-started " + request, - new StartedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)), + new StartedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> ActionResponse.Empty.INSTANCE)), null ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a508f74651f91..3e474f07b43db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; @@ -77,7 +78,6 @@ import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.Transports; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index 6ab4ae82ced9b..f3b4da611811a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.cluster.coordination.Coordinator.Mode; @@ -37,7 +38,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions.Type; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 46ffb7cc2b9c9..ec87df9da2d5e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.Coordinator.Mode; @@ -45,7 +46,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java index 9d5d74fa24648..32f57d1cbd1db 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java @@ -14,6 +14,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; @@ -39,7 +40,6 @@ import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -143,7 +143,7 @@ public JoinValidationService( ); } joinValidators.forEach(joinValidator -> joinValidator.accept(transportService.getLocalNode(), remoteState)); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } ); } @@ -343,7 +343,7 @@ protected void doRun() { REQUEST_OPTIONS, new CleanableResponseHandler<>( listener.map(ignored -> null), - in -> TransportResponse.Empty.INSTANCE, + in -> ActionResponse.Empty.INSTANCE, responseExecutor, bytes::decRef ) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 5108daac3ec4d..9cc079e4d897d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; @@ -36,7 +37,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions.Type; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 35c249596dd37..359d084342a74 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -565,7 +565,7 @@ public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, .handoffPrimaryContext( request.primaryContext(), ActionListener.runBefore( - new ChannelActionListener<>(channel).map(v -> TransportResponse.Empty.INSTANCE), + new ChannelActionListener<>(channel).map(v -> ActionResponse.Empty.INSTANCE), recoveryRef::close ) ); @@ -688,7 +688,7 @@ public final void messageReceived(final T request, TransportChannel channel, Tas } protected CheckedFunction responseMapping(RecoveryTarget recoveryTarget) { - return v -> TransportResponse.Empty.INSTANCE; + return v -> ActionResponse.Empty.INSTANCE; } protected abstract void handleRequest(T request, RecoveryTarget target, ActionListener listener) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index b4408aca1229d..441c83434fe41 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -17,6 +17,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -116,7 +117,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener reader = in -> TransportResponse.Empty.INSTANCE; + final Writeable.Reader reader = in -> ActionResponse.Empty.INSTANCE; executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader); } @@ -131,7 +132,7 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe globalCheckpoint, trimAboveSeqNo ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final Writeable.Reader reader = in -> ActionResponse.Empty.INSTANCE; executeRetryableAction( action, request, @@ -148,7 +149,7 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT, new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext), standardTimeoutRequestOptions, - new ActionListenerResponseHandler<>(listener.map(r -> null), in -> TransportResponse.Empty.INSTANCE, threadPool.generic()) + new ActionListenerResponseHandler<>(listener.map(r -> null), in -> ActionResponse.Empty.INSTANCE, threadPool.generic()) ); } @@ -200,7 +201,7 @@ public void receiveFileInfo( phase1ExistingFileSizes, totalTranslogOps ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final Writeable.Reader reader = in -> ActionResponse.Empty.INSTANCE; executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader); } @@ -221,8 +222,8 @@ public void cleanFiles( totalTranslogOps, globalCheckpoint ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - final ActionListener responseListener = listener.map(r -> null); + final Writeable.Reader reader = in -> ActionResponse.Empty.INSTANCE; + final ActionListener responseListener = listener.map(r -> null); executeRetryableAction(action, request, TransportRequestOptions.EMPTY, responseListener, reader); } @@ -243,8 +244,8 @@ public void restoreFileFromSnapshot( indexId, snapshotFile ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - final ActionListener responseListener = listener.map(r -> null); + final Writeable.Reader reader = in -> ActionResponse.Empty.INSTANCE; + final ActionListener responseListener = listener.map(r -> null); executeRetryableAction(action, request, TransportRequestOptions.EMPTY, responseListener, reader); } @@ -303,8 +304,8 @@ public void writeFileChunk( threadPool.generic() .execute( ActionRunnable.wrap( - ActionListener.runBefore(listener.map(r -> null), request::decRef), - l -> executeRetryableAction(action, request, fileChunkRequestOptions, l, in -> TransportResponse.Empty.INSTANCE) + ActionListener.runBefore(listener.map(r -> null), request::decRef), + l -> executeRetryableAction(action, request, fileChunkRequestOptions, l, in -> ActionResponse.Empty.INSTANCE) ) ); } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java index 32a71bbe8a26a..7155901ca479e 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java @@ -16,6 +16,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ResultDeduplicator; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.CountDownActionListener; @@ -34,7 +35,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -361,7 +361,7 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC final List childTasks = taskManager.setBan(request.parentTaskId, request.reason, channel); final GroupedActionListener listener = new GroupedActionListener<>( childTasks.size() + 1, - new ChannelActionListener<>(channel).map(r -> TransportResponse.Empty.INSTANCE) + new ChannelActionListener<>(channel).map(r -> ActionResponse.Empty.INSTANCE) ); for (CancellableTask childTask : childTasks) { cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener); @@ -370,7 +370,7 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC } else { logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId, localNodeId()); taskManager.removeBan(request.parentTaskId); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } } } @@ -411,7 +411,7 @@ private class CancelChildRequestHandler implements TransportRequestHandler extends W void handleException(TransportException exp); /** - * Implementation of {@link TransportResponseHandler} that handles the empty response {@link TransportResponse.Empty}. + * Implementation of {@link TransportResponseHandler} that handles the empty response {@link ActionResponse.Empty}. */ - abstract class Empty implements TransportResponseHandler { + abstract class Empty implements TransportResponseHandler { @Override - public final TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; + public final ActionResponse.Empty read(StreamInput in) { + return ActionResponse.Empty.INSTANCE; } @Override - public final void handleResponse(TransportResponse.Empty ignored) { + public final void handleResponse(ActionResponse.Empty ignored) { handleResponse(); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 72d56dd21aac8..cb5d10ac3c5c0 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; @@ -44,7 +45,6 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; @@ -319,7 +319,7 @@ public void testUnavailableShardsMarkedAsStale() throws Exception { assertThat(capturedRequest.request(), instanceOf(ShardStateAction.FailedShardEntry.class)); String allocationId = ((ShardStateAction.FailedShardEntry) capturedRequest.request()).getAllocationId(); assertTrue(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId))); - transport.handleResponse(capturedRequest.requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequest.requestId(), ActionResponse.Empty.INSTANCE); } else if (actionName.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) { assertThat(capturedRequest.request(), instanceOf(ConcreteShardRequest.class)); diff --git a/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java index cebb842c30b77..0142992fc254d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -54,7 +55,7 @@ public void testClearAll() throws InterruptedException { @Override public void sendClearAllScrollContexts(Transport.Connection connection, ActionListener listener) { nodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(TransportResponse.Empty.INSTANCE)); // response is unused + Thread t = new Thread(() -> listener.onResponse(ActionResponse.Empty.INSTANCE)); // response is unused t.start(); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index c911b0836c127..f0b50087d5f1c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; @@ -54,7 +55,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; @@ -156,8 +156,8 @@ public void testReplicaNoRefreshCall() throws Exception { final PlainActionFuture future = new PlainActionFuture<>(); testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); - CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + CapturingActionListener listener = new CapturingActionListener<>(); + result.runPostReplicaActions(listener.map(ignore -> ActionResponse.Empty.INSTANCE)); assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); @@ -195,8 +195,8 @@ public void testReplicaImmediateRefresh() throws Exception { final PlainActionFuture future = new PlainActionFuture<>(); testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); - CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + CapturingActionListener listener = new CapturingActionListener<>(); + result.runPostReplicaActions(listener.map(ignore -> ActionResponse.Empty.INSTANCE)); assertNull(listener.response); // Haven't responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) ActionListener.class); @@ -242,8 +242,8 @@ public void testReplicaWaitForRefresh() throws Exception { final PlainActionFuture future = new PlainActionFuture<>(); testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); - CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + CapturingActionListener listener = new CapturingActionListener<>(); + result.runPostReplicaActions(listener.map(ignore -> ActionResponse.Empty.INSTANCE)); assertNull(listener.response); // Haven't responded yet @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); @@ -275,8 +275,8 @@ public void testDocumentFailureInShardOperationOnReplica() throws Exception { final PlainActionFuture future = new PlainActionFuture<>(); testAction.dispatchedShardOperationOnReplica(request, indexShard, future); final TransportReplicationAction.ReplicaResult result = future.actionGet(); - CapturingActionListener listener = new CapturingActionListener<>(); - result.runPostReplicaActions(listener.map(ignore -> TransportResponse.Empty.INSTANCE)); + CapturingActionListener listener = new CapturingActionListener<>(); + result.runPostReplicaActions(listener.map(ignore -> ActionResponse.Empty.INSTANCE)); assertNull(listener.response); assertNotNull(listener.failure); } @@ -379,7 +379,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException { assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); if (randomBoolean()) { // simulate success - transport.handleResponse(shardFailedRequest.requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(shardFailedRequest.requestId(), ActionResponse.Empty.INSTANCE); assertTrue(success.get()); assertNull(failure.get()); } else if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 3d534d9a74ad2..b189d7796b51a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -43,7 +44,6 @@ import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; @@ -179,7 +179,7 @@ public void testSuccess() throws InterruptedException { // sent to the master assertEquals(clusterService.state().nodes().getMasterNode().getId(), capturedRequests[0].node().getId()); - transport.handleResponse(capturedRequests[0].requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequests[0].requestId(), ActionResponse.Empty.INSTANCE); listener.await(); assertNull(listener.failure.get()); @@ -305,7 +305,7 @@ public void testShardNotFound() throws InterruptedException { shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - transport.handleResponse(capturedRequests[0].requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequests[0].requestId(), ActionResponse.Empty.INSTANCE); listener.await(); assertNull(listener.failure.get()); @@ -377,7 +377,7 @@ public void onFailure(Exception e) { } CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests, arrayWithSize(1)); - transport.handleResponse(capturedRequests[0].requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequests[0].requestId(), ActionResponse.Empty.INSTANCE); latch.await(); assertThat(transport.capturedRequests(), arrayWithSize(0)); } @@ -417,7 +417,7 @@ public void onFailure(Exception e) { CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests, arrayWithSize(expectedRequests)); for (int i = 0; i < expectedRequests; i++) { - transport.handleResponse(capturedRequests[i].requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequests[i].requestId(), ActionResponse.Empty.INSTANCE); } latch.await(); assertThat(transport.capturedRequests(), arrayWithSize(0)); @@ -439,7 +439,7 @@ public void testRemoteShardFailedConcurrently() throws Exception { while (shutdown.get() == false) { for (CapturingTransport.CapturedRequest request : transport.getCapturedRequestsAndClear()) { if (randomBoolean()) { - transport.handleResponse(request.requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(request.requestId(), ActionResponse.Empty.INSTANCE); } else { transport.handleRemoteError(request.requestId(), randomFrom(getSimulatedFailure())); } @@ -504,7 +504,7 @@ public void testShardStarted() throws InterruptedException { assertThat(entry.primaryTerm, equalTo(primaryTerm)); assertThat(entry.timestampRange, sameInstance(ShardLongFieldRange.UNKNOWN)); - transport.handleResponse(capturedRequests[0].requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequests[0].requestId(), ActionResponse.Empty.INSTANCE); listener.await(); assertNull(listener.failure.get()); } @@ -537,7 +537,7 @@ private void verifyRetry(int numberOfRetries, AtomicInteger retries, CountDownLa retries.incrementAndGet(); if (retries.get() == numberOfRetries) { // finish the request - transport.handleResponse(capturedRequests[0].requestId(), TransportResponse.Empty.INSTANCE); + transport.handleResponse(capturedRequests[0].requestId(), ActionResponse.Empty.INSTANCE); } else { retryLoop.accept(capturedRequests[0].requestId()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java index 1939aff081cd3..c23f1f214e4ae 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.coordination.Coordinator.Mode; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; @@ -34,8 +36,6 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -377,7 +377,7 @@ public void testFailsNodeThatIsUnhealthy() { private void testBehaviourOfFailingNode( Settings testSettings, - Supplier responder, + Supplier responder, String failureReason, long expectedFailureTime, NodeHealthService nodeHealthService diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 0d45d23433ead..6672815c8dbcd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.NotMasterException; @@ -38,7 +39,6 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.util.HashSet; @@ -185,7 +185,7 @@ public void testJoinDeduplication() { private void completeJoinRequest(CapturingTransport capturingTransport, CapturedRequest request, boolean mightSucceed) { if (mightSucceed && randomBoolean()) { - capturingTransport.handleResponse(request.requestId(), TransportResponse.Empty.INSTANCE); + capturingTransport.handleResponse(request.requestId(), ActionResponse.Empty.INSTANCE); } else { capturingTransport.handleRemoteError(request.requestId(), new CoordinationStateRejectedException("dummy")); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java index 226f5dbf3b2ff..e4102585e162a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -124,7 +125,7 @@ public void onRejection(Exception e) { public void doRun() { handleResponse(requestId, switch (action) { case JoinValidationService.JOIN_VALIDATE_ACTION_NAME, JoinHelper.JOIN_PING_ACTION_NAME -> - TransportResponse.Empty.INSTANCE; + ActionResponse.Empty.INSTANCE; case TransportService.HANDSHAKE_ACTION_NAME -> new TransportService.HandshakeResponse( Version.CURRENT, Build.current().hash(), @@ -402,12 +403,12 @@ public void testJoinValidationRejectsMismatchedClusterUUID() throws IOException .metadata(Metadata.builder().generateClusterUuidIfNeeded()) .build(); - final var future = new PlainActionFuture(); + final var future = new PlainActionFuture(); transportService.sendRequest( localNode, JoinValidationService.JOIN_VALIDATE_ACTION_NAME, serializeClusterState(otherClusterState), - new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER) + new ActionListenerResponseHandler<>(future, in -> ActionResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER) ); deterministicTaskQueue.runAllTasks(); @@ -470,12 +471,12 @@ public void testJoinValidationRunsJoinValidators() { transportService.start(); transportService.acceptIncomingRequests(); - final var future = new PlainActionFuture(); + final var future = new PlainActionFuture(); transportService.sendRequest( localNode, JoinValidationService.JOIN_VALIDATE_ACTION_NAME, serializeClusterState(stateForValidation), - new ActionListenerResponseHandler<>(future, in -> TransportResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER) + new ActionListenerResponseHandler<>(future, in -> ActionResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER) ); deterministicTaskQueue.runAllTasks(); @@ -495,7 +496,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assertSame(node, joiningNode); assertEquals(JoinHelper.JOIN_PING_ACTION_NAME, action); assertTrue(pingSeen.compareAndSet(false, true)); - handleResponse(requestId, TransportResponse.Empty.INSTANCE); + handleResponse(requestId, ActionResponse.Empty.INSTANCE); } }; final var masterTransportService = masterTransport.createTransportService( diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index 62cf54f098575..5aa22042345d6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -13,6 +13,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.coordination.LeaderChecker.LeaderCheckRequest; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -33,8 +35,6 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -554,7 +554,7 @@ public void testLeaderBehaviour() { } } - private static class CapturingTransportResponseHandler implements TransportResponseHandler { + private static class CapturingTransportResponseHandler implements TransportResponseHandler { TransportException transportException; boolean successfulResponseReceived; @@ -565,7 +565,7 @@ private CapturingTransportResponseHandler(Executor executor) { } @Override - public void handleResponse(TransportResponse.Empty response) { + public void handleResponse(ActionResponse.Empty response) { successfulResponseReceived = true; } @@ -580,8 +580,8 @@ public Executor executor() { } @Override - public TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; + public ActionResponse.Empty read(StreamInput in) { + return ActionResponse.Empty.INSTANCE; } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index f65b5c2c35fb3..4ff247ff835fe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -196,7 +197,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req ); } else if (action.equals(JoinValidationService.JOIN_VALIDATE_ACTION_NAME) || action.equals(JoinHelper.JOIN_PING_ACTION_NAME)) { - handleResponse(requestId, TransportResponse.Empty.INSTANCE); + handleResponse(requestId, ActionResponse.Empty.INSTANCE); } else { super.onSendRequest(requestId, action, request, destination); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/RestBuilderListenerTests.java b/server/src/test/java/org/elasticsearch/rest/action/RestBuilderListenerTests.java index d278e027e857f..68f638d6fbd40 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/RestBuilderListenerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/RestBuilderListenerTests.java @@ -9,14 +9,14 @@ package org.elasticsearch.rest.action; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionResponse.Empty; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xcontent.XContentBuilder; import java.util.concurrent.atomic.AtomicReference; @@ -28,7 +28,7 @@ public class RestBuilderListenerTests extends ESTestCase { public void testXContentBuilderClosedInBuildResponse() throws Exception { AtomicReference builderAtomicReference = new AtomicReference<>(); - RestBuilderListener builderListener = new RestBuilderListener<>( + RestBuilderListener builderListener = new RestBuilderListener<>( new FakeRestChannel(new FakeRestRequest(), randomBoolean(), 1) ) { @Override @@ -46,7 +46,7 @@ public RestResponse buildResponse(Empty empty, XContentBuilder builder) { public void testXContentBuilderNotClosedInBuildResponseAssertionsDisabled() throws Exception { AtomicReference builderAtomicReference = new AtomicReference<>(); - RestBuilderListener builderListener = new RestBuilderListener<>( + RestBuilderListener builderListener = new RestBuilderListener<>( new FakeRestChannel(new FakeRestRequest(), randomBoolean(), 1) ) { @Override @@ -70,7 +70,7 @@ boolean assertBuilderClosed(XContentBuilder xContentBuilder) { public void testXContentBuilderNotClosedInBuildResponseAssertionsEnabled() { assumeTrue("tests are not being run with assertions", RestBuilderListener.class.desiredAssertionStatus()); - RestBuilderListener builderListener = new RestBuilderListener<>( + RestBuilderListener builderListener = new RestBuilderListener<>( new FakeRestChannel(new FakeRestRequest(), randomBoolean(), 1) ) { @Override diff --git a/server/src/test/java/org/elasticsearch/transport/TaskTransportChannelTests.java b/server/src/test/java/org/elasticsearch/transport/TaskTransportChannelTests.java index 7fe58b3ce94b6..4b8a3ede9b0bb 100644 --- a/server/src/test/java/org/elasticsearch/transport/TaskTransportChannelTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TaskTransportChannelTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.test.ESTestCase; @@ -21,7 +22,7 @@ public class TaskTransportChannelTests extends ESTestCase { public void testClosesTaskAfterChannelHandoff() throws IOException { runCompletionOrderTest(c -> c.sendResponse(new ElasticsearchException("simulated"))); - runCompletionOrderTest(c -> c.sendResponse(TransportResponse.Empty.INSTANCE)); + runCompletionOrderTest(c -> c.sendResponse(ActionResponse.Empty.INSTANCE)); } private void runCompletionOrderTest(CheckedConsumer channelConsumer) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java index 0cd1abb431fdd..3d7eb3a8d93b1 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.Build; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -71,7 +72,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req testActionName, EsExecutors.DIRECT_EXECUTOR_SERVICE, EmptyRequest::new, - (request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE) + (request, channel, task) -> channel.sendResponse(ActionResponse.Empty.INSTANCE) ); transportService.start(); @@ -89,14 +90,14 @@ protected void onSendRequest(long requestId, String action, TransportRequest req testActionName, new EmptyRequest(), TransportRequestOptions.EMPTY, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @Override - public void handleResponse(TransportResponse.Empty response) { + public void handleResponse(ActionResponse.Empty response) { fail("should not be called"); } @@ -106,7 +107,7 @@ public void handleException(TransportException exp) { } @Override - public TransportResponse.Empty read(StreamInput in) { + public ActionResponse.Empty read(StreamInput in) { throw new AssertionError("should not be called"); } @@ -155,14 +156,14 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, new EmptyRequest(), parentTask, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; } @Override - public void handleResponse(TransportResponse.Empty response) { + public void handleResponse(ActionResponse.Empty response) { fail("should not be called"); } @@ -172,7 +173,7 @@ public void handleException(TransportException exp) { } @Override - public TransportResponse.Empty read(StreamInput in) { + public ActionResponse.Empty read(StreamInput in) { throw new AssertionError("should not be called"); } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java index 5eeb6f65a03bb..790ba4e0a2f2d 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -79,14 +80,14 @@ public void testHandlersCompleteAtShutdown() throws Exception { randomFrom(random, nodeA, nodeB).transportService.getLocalNode(), TestNode.randomActionName(random), new EmptyRequest(), - new TransportResponseHandler() { + new TransportResponseHandler() { final AtomicBoolean completed = new AtomicBoolean(); final Executor executor = nodeB.randomExecutor(); @Override - public void handleResponse(TransportResponse.Empty response) { + public void handleResponse(ActionResponse.Empty response) { assertTrue(completed.compareAndSet(false, true)); requestPermits.release(); } @@ -98,8 +99,8 @@ public void handleException(TransportException exp) { } @Override - public TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; + public ActionResponse.Empty read(StreamInput in) { + return ActionResponse.Empty.INSTANCE; } @Override @@ -132,7 +133,7 @@ public void testInternalSendExceptionForksToHandlerExecutor() { final var deterministicTaskQueue = new DeterministicTaskQueue(); try (var nodeA = new TestNode("node-A")) { - final var future = new PlainActionFuture(); + final var future = new PlainActionFuture(); nodeA.transportService.sendRequest( nodeA.getThrowingConnection(), TestNode.randomActionName(random()), @@ -156,7 +157,7 @@ public void testInternalSendExceptionForksToGenericIfHandlerDoesNotForkAndStackO Settings.builder().put(TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE.getKey(), true).build() ) ) { - final var future = new PlainActionFuture(); + final var future = new PlainActionFuture(); nodeA.transportService.sendRequest( nodeA.getThrowingConnection(), TestNode.randomActionName(random()), @@ -183,7 +184,7 @@ public void testInternalSendExceptionWithNonForkingResponseHandlerCompletesListe "simulated exception in sendRequest", safeAwaitAndUnwrapFailure( IOException.class, - TransportResponse.Empty.class, + ActionResponse.Empty.class, l -> nodeA.transportService.sendRequest( nodeA.getThrowingConnection(), TestNode.randomActionName(random()), @@ -212,7 +213,7 @@ public void testInternalSendExceptionForcesExecutionOnHandlerExecutor() { } } - final var future = new PlainActionFuture(); + final var future = new PlainActionFuture(); try { nodeA.transportService.sendRequest( nodeA.getThrowingConnection(), @@ -239,7 +240,7 @@ public void testInternalSendExceptionCompletesHandlerOnCallingThreadIfTransportS nodeA.close(); final var testThread = Thread.currentThread(); - final var future = new PlainActionFuture(); + final var future = new PlainActionFuture(); nodeA.transportService.sendRequest( nodeA.getThrowingConnection(), TestNode.randomActionName(random()), @@ -400,7 +401,7 @@ public ExecutorService executor(String name) { EmptyRequest::new, (request, channel, task) -> { if (randomBoolean()) { - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } else { channel.sendResponse(new ElasticsearchException("simulated")); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 11fe07e005b9c..fdc7482a258ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -19,6 +19,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; @@ -518,7 +519,7 @@ public void testMessageListeners() throws Exception { final TransportRequestHandler requestHandler = (request, channel, task) -> { try { if (randomBoolean()) { - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } else { channel.sendResponse(new ElasticsearchException("simulated")); } @@ -652,7 +653,7 @@ public void testVoidMessageCompressed() throws Exception { EmptyRequest::new, (request, channel, task) -> { try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } catch (Exception e) { logger.error("Unexpected failure", e); fail(e.getMessage()); @@ -670,15 +671,15 @@ public void testVoidMessageCompressed() throws Exception { ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); connectToNode(serviceC, serviceA.getLocalNode(), connectionProfile); - Future res = submitRequest( + Future res = submitRequest( serviceC, nodeA, "internal:sayHello", new EmptyRequest(), new TransportResponseHandler<>() { @Override - public TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; + public ActionResponse.Empty read(StreamInput in) { + return ActionResponse.Empty.INSTANCE; } @Override @@ -687,7 +688,7 @@ public Executor executor() { } @Override - public void handleResponse(TransportResponse.Empty response) {} + public void handleResponse(ActionResponse.Empty response) {} @Override public void handleException(TransportException exp) { @@ -1085,7 +1086,7 @@ public void testNotifyOnShutdown() throws Exception { } } ); - Future foobar = submitRequest( + Future foobar = submitRequest( serviceB, nodeA, "internal:foobar", @@ -1974,7 +1975,7 @@ public void testRejectEarlyIncomingRequests() throws Exception { TestRequest::new, (request, channel, task) -> { requestProcessed.set(true); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } ); @@ -2499,7 +2500,7 @@ public void testResponseHeadersArePreserved() throws InterruptedException { if ("fail".equals(request.info)) { throw new RuntimeException("boom"); } else { - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } } ); @@ -2512,13 +2513,13 @@ public void testResponseHeadersArePreserved() throws InterruptedException { @Override public TransportResponse read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; + return ActionResponse.Empty.INSTANCE; } @Override public void handleResponse(TransportResponse response) { try { - assertSame(response, TransportResponse.Empty.INSTANCE); + assertSame(response, ActionResponse.Empty.INSTANCE); assertTrue(threadPool.getThreadContext().getResponseHeaders().containsKey("foo.bar")); assertEquals(1, threadPool.getThreadContext().getResponseHeaders().get("foo.bar").size()); assertEquals("baz", threadPool.getThreadContext().getResponseHeaders().get("foo.bar").get(0)); @@ -2568,7 +2569,7 @@ public void testHandlerIsInvokedOnConnectionClose() throws IOException, Interrup TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override public TransportResponse read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; + return ActionResponse.Empty.INSTANCE; } @Override @@ -2641,13 +2642,13 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { receivedLatch.countDown(); sendResponseLatch.await(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } }); } ); CountDownLatch responseLatch = new CountDownLatch(1); - TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { + TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { @Override public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; @@ -2709,13 +2710,13 @@ public void onFailure(Exception e) { protected void doRun() throws Exception { receivedLatch.countDown(); sendResponseLatch.await(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); } }); } ); CountDownLatch responseLatch = new CountDownLatch(1); - TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { + TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { @Override public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; @@ -2829,7 +2830,7 @@ protected void doRun() throws Exception { ); CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference receivedException = new AtomicReference<>(null); - TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { + TransportResponseHandler transportResponseHandler = new TransportResponseHandler.Empty() { @Override public Executor executor() { return TransportResponseHandler.TRANSPORT_WORKER; @@ -3179,7 +3180,7 @@ public void testChannelToString() { ) ); assertThat(new ChannelActionListener<>(channel).toString(), containsString(channel.toString())); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); }); serviceB.registerRequestHandler(ACTION, EsExecutors.DIRECT_EXECUTOR_SERVICE, EmptyRequest::new, (request, channel, task) -> { assertThat( @@ -3192,7 +3193,7 @@ public void testChannelToString() { containsString(serviceB.getLocalNode().getAddress().toString()) ) ); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); }); safeAwait( @@ -3203,7 +3204,7 @@ public void testChannelToString() { new EmptyRequest(), new ActionListenerResponseHandler<>( listener, - ignored -> TransportResponse.Empty.INSTANCE, + ignored -> ActionResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER ) ) @@ -3217,7 +3218,7 @@ public void testChannelToString() { new EmptyRequest(), new ActionListenerResponseHandler<>( listener, - ignored -> TransportResponse.Empty.INSTANCE, + ignored -> ActionResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER ) ) @@ -3331,7 +3332,7 @@ public void testWatchdogLogging() { serviceA.registerRequestHandler(actionName, EsExecutors.DIRECT_EXECUTOR_SERVICE, EmptyRequest::new, (request, channel, task) -> { threadNameFuture.onResponse(Thread.currentThread().getName()); safeAwait(barrier); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + channel.sendResponse(ActionResponse.Empty.INSTANCE); }); final var responseLatch = new CountDownLatch(1); @@ -3342,7 +3343,7 @@ public void testWatchdogLogging() { new EmptyRequest(), new ActionListenerResponseHandler( ActionTestUtils.assertNoFailureListener(t -> responseLatch.countDown()), - in -> TransportResponse.Empty.INSTANCE, + in -> ActionResponse.Empty.INSTANCE, EsExecutors.DIRECT_EXECUTOR_SERVICE ) ); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java index 38d879f8f7ad4..409f5ed39c017 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.common.io.stream.StreamInput; @@ -20,7 +21,6 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; @@ -117,13 +117,13 @@ public Status getStatus() { private record DriverRequestHandler(TransportService transportService) implements TransportRequestHandler { @Override public void messageReceived(DriverRequest request, TransportChannel channel, Task task) { - var listener = new ChannelActionListener(channel); + var listener = new ChannelActionListener(channel); Driver.start( transportService.getThreadPool().getThreadContext(), request.executor, request.driver, Driver.DEFAULT_MAX_ITERATIONS, - listener.map(unused -> TransportResponse.Empty.INSTANCE) + listener.map(unused -> ActionResponse.Empty.INSTANCE) ); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 73e9cc9d54551..56536261f834d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -13,6 +13,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -37,7 +38,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.Transports; @@ -171,7 +171,7 @@ public static void openExchange( OPEN_EXCHANGE_ACTION_NAME, new OpenExchangeRequest(sessionId, exchangeBuffer), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener.map(unused -> null), in -> TransportResponse.Empty.INSTANCE, responseExecutor) + new ActionListenerResponseHandler<>(listener.map(unused -> null), in -> ActionResponse.Empty.INSTANCE, responseExecutor) ); } @@ -228,7 +228,7 @@ private class OpenExchangeRequestHandler implements TransportRequestHandler channel.sendResponse(TransportResponse.Empty.INSTANCE), + (request, channel, task) -> channel.sendResponse(ActionResponse.Empty.INSTANCE), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, true,