diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index af3fdc317c8a7..37bc9a6cae920 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -45,6 +45,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -106,13 +107,36 @@ public PublicationTransportHandler( this.namedWriteableRegistry = namedWriteableRegistry; this.handlePublishRequest = handlePublishRequest; + var generic = transportService.getThreadPool().generic(); transportService.registerRequestHandler( PUBLISH_STATE_ACTION_NAME, - transportService.getThreadPool().generic(), + TransportResponseHandler.TRANSPORT_WORKER, false, false, BytesTransportRequest::new, - (request, channel, task) -> this.handleIncomingPublishRequest(request, new ChannelActionListener<>(channel)) + (req, channel, task) -> { + req.mustIncRef(); + generic.execute(new ActionRunnable<>(new ChannelActionListener(channel)) { + + private BytesTransportRequest request = req; + + @Override + protected void doRun() throws IOException { + var request = this.request; + this.request = null; + handleIncomingPublishRequest(request, listener); + } + + @Override + public void onFailure(Exception e) { + if (request != null) { + request.decRef(); + this.request = null; + } + super.onFailure(e); + } + }); + } ); } @@ -129,10 +153,12 @@ private void handleIncomingPublishRequest( BytesTransportRequest request, ActionListener publishResponseListener ) throws IOException { - assert ThreadPool.assertCurrentThreadPool(GENERIC); - final Compressor compressor = CompressorFactory.compressor(request.bytes()); - StreamInput in = request.bytes().streamInput(); + var bytes = request.bytes(); + StreamInput in = bytes.streamInput(); try { + assert ThreadPool.assertCurrentThreadPool(GENERIC); + final Compressor compressor = CompressorFactory.compressor(bytes); + final int requestLength = bytes.length(); if (compressor != null) { in = compressor.threadLocalStreamInput(in); } @@ -150,8 +176,10 @@ private void handleIncomingPublishRequest( assert false : e; throw e; } + request.decRef(); + request = null; fullClusterStateReceivedCount.incrementAndGet(); - logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); + logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), requestLength); acceptState(incomingState, publishResponseListener.map(response -> { lastSeenClusterState.set(incomingState); return response; @@ -164,12 +192,14 @@ private void handleIncomingPublishRequest( throw new IncompatibleClusterStateVersionException("have no local cluster state"); } else { final ClusterState incomingState = deserializeAndApplyDiff(request, in, lastSeen); + request.decRef(); + request = null; compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug( "received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), - request.bytes().length() + requestLength ); acceptState(incomingState, publishResponseListener.map(response -> { lastSeenClusterState.compareAndSet(lastSeen, incomingState); @@ -178,6 +208,9 @@ private void handleIncomingPublishRequest( } } } finally { + if (request != null) { + request.decRef(); + } IOUtils.close(in); } }