Skip to content

Replace TransportResponse.Empty with ActionResponse.Empty #126400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.internal.node.NodeClient;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void testThreadWatchdogTransportLogging() {
EmptyRequest::new,
(request, channel, task) -> {
blockAndWaitForWatchdogLogs();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(ActionResponse.Empty.INSTANCE);
}
);

Expand All @@ -149,7 +150,7 @@ public void testThreadWatchdogTransportLogging() {
new EmptyRequest(),
new ActionListenerResponseHandler<TransportResponse>(
l,
in -> TransportResponse.Empty.INSTANCE,
in -> ActionResponse.Empty.INSTANCE,
EsExecutors.DIRECT_EXECUTOR_SERVICE
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
Expand Down Expand Up @@ -161,7 +162,7 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
new ClearScrollContextsRequest(),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, in -> TransportResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER)
new ActionListenerResponseHandler<>(listener, in -> ActionResponse.Empty.INSTANCE, TransportResponseHandler.TRANSPORT_WORKER)
);
}

Expand Down Expand Up @@ -418,14 +419,14 @@ public static void registerRequestHandler(TransportService transportService, Sea
ClearScrollContextsRequest::new,
(request, channel, task) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(ActionResponse.Empty.INSTANCE);
}
);
TransportActionProxy.registerProxyAction(
transportService,
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
false,
(in) -> TransportResponse.Empty.INSTANCE
(in) -> ActionResponse.Empty.INSTANCE
);

transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -57,7 +58,6 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -291,7 +291,7 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel,
logger.debug(() -> format("%s received shard failed for [%s]", request.getShardId(), request), request.failure);
taskQueue.submitTask(
"shard-failed " + request.toStringNoFailureStackTrace(),
new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)),
new FailedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> ActionResponse.Empty.INSTANCE)),
null
);
}
Expand Down Expand Up @@ -602,7 +602,7 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel,
logger.debug("{} received shard started for [{}]", request.shardId, request);
taskQueue.submitTask(
"shard-started " + request,
new StartedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> TransportResponse.Empty.INSTANCE)),
new StartedShardUpdateTask(request, new ChannelActionListener<>(channel).map(ignored -> ActionResponse.Empty.INSTANCE)),
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse.Empty;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
Expand Down Expand Up @@ -77,7 +78,6 @@
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse.Empty;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
Expand All @@ -37,7 +38,6 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse.Empty;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
Expand Down Expand Up @@ -45,7 +46,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -39,7 +40,6 @@
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -143,7 +143,7 @@ public JoinValidationService(
);
}
joinValidators.forEach(joinValidator -> joinValidator.accept(transportService.getLocalNode(), remoteState));
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(ActionResponse.Empty.INSTANCE);
}
);
}
Expand Down Expand Up @@ -343,7 +343,7 @@ protected void doRun() {
REQUEST_OPTIONS,
new CleanableResponseHandler<>(
listener.map(ignored -> null),
in -> TransportResponse.Empty.INSTANCE,
in -> ActionResponse.Empty.INSTANCE,
responseExecutor,
bytes::decRef
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse.Empty;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -36,7 +37,6 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportRequestOptions.Type;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ public void messageReceived(final RecoveryHandoffPrimaryContextRequest request,
.handoffPrimaryContext(
request.primaryContext(),
ActionListener.runBefore(
new ChannelActionListener<>(channel).map(v -> TransportResponse.Empty.INSTANCE),
new ChannelActionListener<>(channel).map(v -> ActionResponse.Empty.INSTANCE),
recoveryRef::close
)
);
Expand Down Expand Up @@ -688,7 +688,7 @@ public final void messageReceived(final T request, TransportChannel channel, Tas
}

protected CheckedFunction<Void, TransportResponse, Exception> responseMapping(RecoveryTarget recoveryTarget) {
return v -> TransportResponse.Empty.INSTANCE;
return v -> ActionResponse.Empty.INSTANCE;
}

protected abstract void handleRequest(T request, RecoveryTarget target, ActionListener<Void> listener) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
shardId,
totalTranslogOps
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final Writeable.Reader<ActionResponse.Empty> reader = in -> ActionResponse.Empty.INSTANCE;
executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader);
}

Expand All @@ -131,7 +132,7 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe
globalCheckpoint,
trimAboveSeqNo
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final Writeable.Reader<ActionResponse.Empty> reader = in -> ActionResponse.Empty.INSTANCE;
executeRetryableAction(
action,
request,
Expand All @@ -148,7 +149,7 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,
new RecoveryHandoffPrimaryContextRequest(recoveryId, shardId, primaryContext),
standardTimeoutRequestOptions,
new ActionListenerResponseHandler<>(listener.map(r -> null), in -> TransportResponse.Empty.INSTANCE, threadPool.generic())
new ActionListenerResponseHandler<>(listener.map(r -> null), in -> ActionResponse.Empty.INSTANCE, threadPool.generic())
);
}

Expand Down Expand Up @@ -200,7 +201,7 @@ public void receiveFileInfo(
phase1ExistingFileSizes,
totalTranslogOps
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final Writeable.Reader<ActionResponse.Empty> reader = in -> ActionResponse.Empty.INSTANCE;
executeRetryableAction(action, request, standardTimeoutRequestOptions, listener.map(r -> null), reader);
}

Expand All @@ -221,8 +222,8 @@ public void cleanFiles(
totalTranslogOps,
globalCheckpoint
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = listener.map(r -> null);
final Writeable.Reader<ActionResponse.Empty> reader = in -> ActionResponse.Empty.INSTANCE;
final ActionListener<ActionResponse.Empty> responseListener = listener.map(r -> null);
executeRetryableAction(action, request, TransportRequestOptions.EMPTY, responseListener, reader);
}

Expand All @@ -243,8 +244,8 @@ public void restoreFileFromSnapshot(
indexId,
snapshotFile
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = listener.map(r -> null);
final Writeable.Reader<ActionResponse.Empty> reader = in -> ActionResponse.Empty.INSTANCE;
final ActionListener<ActionResponse.Empty> responseListener = listener.map(r -> null);
executeRetryableAction(action, request, TransportRequestOptions.EMPTY, responseListener, reader);
}

Expand Down Expand Up @@ -303,8 +304,8 @@ public void writeFileChunk(
threadPool.generic()
.execute(
ActionRunnable.wrap(
ActionListener.<TransportResponse.Empty>runBefore(listener.map(r -> null), request::decRef),
l -> executeRetryableAction(action, request, fileChunkRequestOptions, l, in -> TransportResponse.Empty.INSTANCE)
ActionListener.<ActionResponse.Empty>runBefore(listener.map(r -> null), request::decRef),
l -> executeRetryableAction(action, request, fileChunkRequestOptions, l, in -> ActionResponse.Empty.INSTANCE)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.CountDownActionListener;
Expand All @@ -34,7 +35,6 @@
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -361,7 +361,7 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC
final List<CancellableTask> childTasks = taskManager.setBan(request.parentTaskId, request.reason, channel);
final GroupedActionListener<Void> listener = new GroupedActionListener<>(
childTasks.size() + 1,
new ChannelActionListener<>(channel).map(r -> TransportResponse.Empty.INSTANCE)
new ChannelActionListener<>(channel).map(r -> ActionResponse.Empty.INSTANCE)
);
for (CancellableTask childTask : childTasks) {
cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener);
Expand All @@ -370,7 +370,7 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC
} else {
logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId, localNodeId());
taskManager.removeBan(request.parentTaskId);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(ActionResponse.Empty.INSTANCE);
}
}
}
Expand Down Expand Up @@ -411,7 +411,7 @@ private class CancelChildRequestHandler implements TransportRequestHandler<Cance
@Override
public void messageReceived(final CancelChildRequest request, final TransportChannel channel, Task task) throws Exception {
taskManager.cancelChildLocal(request.parentTaskId, request.childRequestId, request.reason);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
channel.sendResponse(ActionResponse.Empty.INSTANCE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,9 @@

package org.elasticsearch.transport;

import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public abstract class TransportResponse extends TransportMessage {

/**
* Constructs a new empty transport response
*/
public TransportResponse() {}

public static class Empty extends TransportResponse {
public static final Empty INSTANCE = new Empty();

private Empty() {/* singleton */}

@Override
public String toString() {
return "Empty{}";
}

@Override
public void writeTo(StreamOutput out) throws IOException {}
}
protected TransportResponse() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -31,16 +32,16 @@ public interface TransportResponseHandler<T extends TransportResponse> extends W
void handleException(TransportException exp);

/**
* Implementation of {@link TransportResponseHandler} that handles the empty response {@link TransportResponse.Empty}.
* Implementation of {@link TransportResponseHandler} that handles the empty response {@link ActionResponse.Empty}.
*/
abstract class Empty implements TransportResponseHandler<TransportResponse.Empty> {
abstract class Empty implements TransportResponseHandler<ActionResponse.Empty> {
@Override
public final TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
public final ActionResponse.Empty read(StreamInput in) {
return ActionResponse.Empty.INSTANCE;
}

@Override
public final void handleResponse(TransportResponse.Empty ignored) {
public final void handleResponse(ActionResponse.Empty ignored) {
handleResponse();
}

Expand Down
Loading