diff --git a/rsocket-core/src/jcstress/java/io/rsocket/resume/InMemoryResumableFramesStoreStressTest.java b/rsocket-core/src/jcstress/java/io/rsocket/resume/InMemoryResumableFramesStoreStressTest.java new file mode 100644 index 000000000..f0b209552 --- /dev/null +++ b/rsocket-core/src/jcstress/java/io/rsocket/resume/InMemoryResumableFramesStoreStressTest.java @@ -0,0 +1,118 @@ +package io.rsocket.resume; + +import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.rsocket.exceptions.ConnectionErrorException; +import io.rsocket.frame.ErrorFrameCodec; +import io.rsocket.frame.PayloadFrameCodec; +import io.rsocket.internal.UnboundedProcessor; +import org.openjdk.jcstress.annotations.Actor; +import org.openjdk.jcstress.annotations.Arbiter; +import org.openjdk.jcstress.annotations.JCStressTest; +import org.openjdk.jcstress.annotations.Outcome; +import org.openjdk.jcstress.annotations.State; +import org.openjdk.jcstress.infra.results.LL_Result; +import reactor.core.Disposable; + +public class InMemoryResumableFramesStoreStressTest { + boolean storeClosed; + + InMemoryResumableFramesStore store = + new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 128); + boolean processorClosed; + UnboundedProcessor processor = new UnboundedProcessor(() -> processorClosed = true); + + void subscribe() { + store.saveFrames(processor).subscribe(); + store.onClose().subscribe(null, t -> storeClosed = true, () -> storeClosed = true); + } + + @JCStressTest + @Outcome( + id = {"true, true"}, + expect = ACCEPTABLE) + @State + public static class TwoSubscribesRaceStressTest extends InMemoryResumableFramesStoreStressTest { + + Disposable d1; + + final ByteBuf b1 = + PayloadFrameCodec.encode( + ByteBufAllocator.DEFAULT, + 1, + false, + true, + false, + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello1"), + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello2")); + final ByteBuf b2 = + PayloadFrameCodec.encode( + ByteBufAllocator.DEFAULT, + 3, + false, + true, + false, + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello3"), + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello4")); + final ByteBuf b3 = + PayloadFrameCodec.encode( + ByteBufAllocator.DEFAULT, + 5, + false, + true, + false, + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello5"), + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello6")); + + final ByteBuf c1 = + ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new ConnectionErrorException("closed")); + + { + subscribe(); + d1 = store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {}); + } + + @Actor + public void producer1() { + processor.tryEmitNormal(b1); + processor.tryEmitNormal(b2); + processor.tryEmitNormal(b3); + } + + @Actor + public void producer2() { + processor.tryEmitFinal(c1); + } + + @Actor + public void producer3() { + d1.dispose(); + store + .doOnDiscard(ByteBuf.class, ByteBuf::release) + .subscribe(ByteBuf::release, t -> {}) + .dispose(); + store + .doOnDiscard(ByteBuf.class, ByteBuf::release) + .subscribe(ByteBuf::release, t -> {}) + .dispose(); + store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {}); + } + + @Actor + public void producer4() { + store.releaseFrames(0); + store.releaseFrames(0); + store.releaseFrames(0); + } + + @Arbiter + public void arbiter(LL_Result r) { + r.r1 = storeClosed; + r.r2 = processorClosed; + } + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java index d6cb46d98..e19d31924 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java @@ -67,8 +67,8 @@ public ClientServerInputMultiplexer( this.source = source; this.isClient = isClient; - this.serverReceiver = new InternalDuplexConnection(this, source); - this.clientReceiver = new InternalDuplexConnection(this, source); + this.serverReceiver = new InternalDuplexConnection(Type.SERVER, this, source); + this.clientReceiver = new InternalDuplexConnection(Type.CLIENT, this, source); this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver); this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver); } @@ -195,8 +195,33 @@ int incrementAndGetCheckingState() { } } + @Override + public String toString() { + return "ClientServerInputMultiplexer{" + + "serverReceiver=" + + serverReceiver + + ", clientReceiver=" + + clientReceiver + + ", serverConnection=" + + serverConnection + + ", clientConnection=" + + clientConnection + + ", source=" + + source + + ", isClient=" + + isClient + + ", s=" + + s + + ", t=" + + t + + ", state=" + + state + + '}'; + } + private static class InternalDuplexConnection extends Flux implements Subscription, DuplexConnection { + private final Type type; private final ClientServerInputMultiplexer clientServerInputMultiplexer; private final DuplexConnection source; @@ -207,7 +232,10 @@ private static class InternalDuplexConnection extends Flux CoreSubscriber actual; public InternalDuplexConnection( - ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) { + Type type, + ClientServerInputMultiplexer clientServerInputMultiplexer, + DuplexConnection source) { + this.type = type; this.clientServerInputMultiplexer = clientServerInputMultiplexer; this.source = source; } @@ -304,5 +332,17 @@ public Mono onClose() { public double availability() { return source.availability(); } + + @Override + public String toString() { + return "InternalDuplexConnection{" + + "type=" + + type + + ", source=" + + source + + ", state=" + + state + + '}'; + } } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 432c0f0f5..de494c4e3 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -47,6 +47,7 @@ import java.util.function.Supplier; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.util.annotation.Nullable; import reactor.util.function.Tuples; import reactor.util.retry.Retry; @@ -633,8 +634,7 @@ public Mono connect(Supplier transportSupplier) { wrappedConnection = resumableDuplexConnection; } else { keepAliveHandler = - new KeepAliveHandler.DefaultKeepAliveHandler( - clientServerConnection); + new KeepAliveHandler.DefaultKeepAliveHandler(); wrappedConnection = clientServerConnection; } @@ -655,6 +655,11 @@ public Mono connect(Supplier transportSupplier) { requesterLeaseTracker = null; } + final Sinks.Empty requesterOnAllClosedSink = + Sinks.unsafe().empty(); + final Sinks.Empty responderOnAllClosedSink = + Sinks.unsafe().empty(); + RSocket rSocketRequester = new RSocketRequester( multiplexer.asClientConnection(), @@ -667,7 +672,11 @@ public Mono connect(Supplier transportSupplier) { (int) keepAliveMaxLifeTime.toMillis(), keepAliveHandler, interceptors::initRequesterRequestInterceptor, - requesterLeaseTracker); + requesterLeaseTracker, + requesterOnAllClosedSink, + Mono.whenDelayError( + responderOnAllClosedSink.asMono(), + requesterOnAllClosedSink.asMono())); RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester); @@ -715,7 +724,8 @@ public Mono connect(Supplier transportSupplier) { (RequestInterceptor) leases.sender) : interceptors - ::initResponderRequestInterceptor); + ::initResponderRequestInterceptor, + responderOnAllClosedSink); return wrappedRSocketRequester; }) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index bf298706a..9e8d349bf 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -66,8 +66,10 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket { RSocketRequester.class, Throwable.class, "terminationError"); @Nullable private final RequesterLeaseTracker requesterLeaseTracker; + + private final Sinks.Empty onThisSideClosedSink; + private final Mono onAllClosed; private final KeepAliveFramesAcceptor keepAliveFramesAcceptor; - private final Sinks.Empty onClose; RSocketRequester( DuplexConnection connection, @@ -80,7 +82,9 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket { int keepAliveAckTimeout, @Nullable KeepAliveHandler keepAliveHandler, Function requestInterceptorFunction, - @Nullable RequesterLeaseTracker requesterLeaseTracker) { + @Nullable RequesterLeaseTracker requesterLeaseTracker, + Sinks.Empty onThisSideClosedSink, + Mono onAllClosed) { super( mtu, maxFrameLength, @@ -91,10 +95,11 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket { requestInterceptorFunction); this.requesterLeaseTracker = requesterLeaseTracker; - this.onClose = Sinks.empty(); + this.onThisSideClosedSink = onThisSideClosedSink; + this.onAllClosed = onAllClosed; // DO NOT Change the order here. The Send processor must be subscribed to before receiving - connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown); + connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown); connection.receive().subscribe(this::handleIncomingFrames, e -> {}); @@ -188,7 +193,11 @@ public double availability() { @Override public void dispose() { - tryShutdown(); + if (terminationError != null) { + return; + } + + getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed")); } @Override @@ -198,7 +207,7 @@ public boolean isDisposed() { @Override public Mono onClose() { - return onClose.asMono(); + return onAllClosed; } private void handleIncomingFrames(ByteBuf frame) { @@ -305,8 +314,31 @@ private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) { String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis()))); } - private void tryTerminateOnConnectionError(Throwable e) { - tryTerminate(() -> e); + private void tryShutdown(Throwable e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("trying to close requester " + getDuplexConnection()); + } + if (terminationError == null) { + if (TERMINATION_ERROR.compareAndSet(this, null, e)) { + terminate(CLOSED_CHANNEL_EXCEPTION); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "trying to close requester failed because of " + + terminationError + + " " + + getDuplexConnection()); + } + } + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.info( + "trying to close requester failed because of " + + terminationError + + " " + + getDuplexConnection()); + } + } } private void tryTerminateOnZeroError(ByteBuf errorFrame) { @@ -314,27 +346,67 @@ private void tryTerminateOnZeroError(ByteBuf errorFrame) { } private void tryTerminate(Supplier errorSupplier) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("trying to close requester " + getDuplexConnection()); + } if (terminationError == null) { Throwable e = errorSupplier.get(); if (TERMINATION_ERROR.compareAndSet(this, null, e)) { terminate(e); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "trying to close requester failed because of " + + terminationError + + " " + + getDuplexConnection()); + } + } + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "trying to close requester failed because of " + + terminationError + + " " + + getDuplexConnection()); } } } private void tryShutdown() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("trying to close requester " + getDuplexConnection()); + } if (terminationError == null) { if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) { terminate(CLOSED_CHANNEL_EXCEPTION); + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "trying to close requester failed because of " + + terminationError + + " " + + getDuplexConnection()); + } + } + } else { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "trying to close requester failed because of " + + terminationError + + " " + + getDuplexConnection()); } } } private void terminate(Throwable e) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("closing requester " + getDuplexConnection() + " due to " + e); + } if (keepAliveFramesAcceptor != null) { keepAliveFramesAcceptor.dispose(); } - getDuplexConnection().dispose(); final RequestInterceptor requestInterceptor = getRequestInterceptor(); if (requestInterceptor != null) { requestInterceptor.dispose(); @@ -361,9 +433,12 @@ private void terminate(Throwable e) { } if (e == CLOSED_CHANNEL_EXCEPTION) { - onClose.tryEmitEmpty(); + onThisSideClosedSink.tryEmitEmpty(); } else { - onClose.tryEmitError(e); + onThisSideClosedSink.tryEmitError(e); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("requester closed " + getDuplexConnection()); } } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index ce4fe70a3..50c5ba54c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.util.annotation.Nullable; /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ @@ -54,6 +55,7 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket { private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException(); private final RSocket requestHandler; + private final Sinks.Empty onThisSideClosedSink; @Nullable private final ResponderLeaseTracker leaseHandler; @@ -70,7 +72,8 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket { int mtu, int maxFrameLength, int maxInboundPayloadSize, - Function requestInterceptorFunction) { + Function requestInterceptorFunction, + Sinks.Empty onThisSideClosedSink) { super( mtu, maxFrameLength, @@ -83,19 +86,27 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket { this.requestHandler = requestHandler; this.leaseHandler = leaseHandler; - - connection.receive().subscribe(this::handleFrame, e -> {}); + this.onThisSideClosedSink = onThisSideClosedSink; connection .onClose() .subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose); + + connection.receive().subscribe(this::handleFrame, e -> {}); } private void tryTerminateOnConnectionError(Throwable e) { + if (LOGGER.isDebugEnabled()) { + + LOGGER.debug("Try terminate connection on responder side"); + } tryTerminate(() -> e); } private void tryTerminateOnConnectionClose() { + if (LOGGER.isDebugEnabled()) { + LOGGER.info("Try terminate connection on responder side"); + } tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION); } @@ -169,6 +180,9 @@ public Mono onClose() { } final void doOnDispose() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("closing responder " + getDuplexConnection()); + } cleanUpSendingSubscriptions(); getDuplexConnection().dispose(); @@ -183,6 +197,10 @@ final void doOnDispose() { } requestHandler.dispose(); + onThisSideClosedSink.tryEmitEmpty(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responder closed " + getDuplexConnection()); + } } private void cleanUpSendingSubscriptions() { diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 3208bb4fd..0c68db6df 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -46,6 +46,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; /** * The main class for starting an RSocket server. @@ -437,6 +438,9 @@ private Mono acceptSetup( requesterLeaseTracker = null; } + final Sinks.Empty requesterOnAllClosedSink = Sinks.unsafe().empty(); + final Sinks.Empty responderOnAllClosedSink = Sinks.unsafe().empty(); + RSocket rSocketRequester = new RSocketRequester( multiplexer.asServerConnection(), @@ -449,7 +453,10 @@ private Mono acceptSetup( setupPayload.keepAliveMaxLifetime(), keepAliveHandler, interceptors::initRequesterRequestInterceptor, - requesterLeaseTracker); + requesterLeaseTracker, + requesterOnAllClosedSink, + Mono.whenDelayError( + responderOnAllClosedSink.asMono(), requesterOnAllClosedSink.asMono())); RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester); @@ -481,7 +488,8 @@ private Mono acceptSetup( ? rSocket -> interceptors.initResponderRequestInterceptor( rSocket, (RequestInterceptor) leases.sender) - : interceptors::initResponderRequestInterceptor); + : interceptors::initResponderRequestInterceptor, + responderOnAllClosedSink); }) .doFinally(signalType -> setupPayload.release()) .then(); diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index ddad96047..5aae22e89 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -79,7 +79,7 @@ public Mono acceptRSocketSetup( sendError(duplexConnection, new UnsupportedSetupException("resume not supported")); return duplexConnection.onClose(); } else { - return then.apply(new DefaultKeepAliveHandler(duplexConnection), duplexConnection); + return then.apply(new DefaultKeepAliveHandler(), duplexConnection); } } @@ -141,7 +141,7 @@ public Mono acceptRSocketSetup( resumableDuplexConnection, serverRSocketSession, serverRSocketSession), resumableDuplexConnection); } else { - return then.apply(new DefaultKeepAliveHandler(duplexConnection), duplexConnection); + return then.apply(new DefaultKeepAliveHandler(), duplexConnection); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java index 2da572de3..3beedf97f 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java @@ -168,4 +168,9 @@ public void sendErrorAndClose(RSocketErrorException e) { public ByteBufAllocator alloc() { return source.alloc(); } + + @Override + public String toString() { + return "SetupHandlingDuplexConnection{" + "source=" + source + ", done=" + done + '}'; + } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 95bc210fe..23ada95fe 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -619,13 +619,8 @@ public ByteBuf poll() { t = this.last; if (t != null) { - try { - this.last = null; - return t; - } finally { - - clearAndFinalize(this); - } + this.last = null; + return t; } return null; diff --git a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java index b92c25f46..4fd7a772d 100644 --- a/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveHandler.java @@ -1,7 +1,6 @@ package io.rsocket.keepalive; import io.netty.buffer.ByteBuf; -import io.rsocket.Closeable; import io.rsocket.keepalive.KeepAliveSupport.KeepAlive; import io.rsocket.resume.RSocketSession; import io.rsocket.resume.ResumableDuplexConnection; @@ -16,18 +15,11 @@ KeepAliveFramesAcceptor start( Consumer onTimeout); class DefaultKeepAliveHandler implements KeepAliveHandler { - private final Closeable duplexConnection; - - public DefaultKeepAliveHandler(Closeable duplexConnection) { - this.duplexConnection = duplexConnection; - } - @Override public KeepAliveFramesAcceptor start( KeepAliveSupport keepAliveSupport, Consumer onSendKeepAliveFrame, Consumer onTimeout) { - duplexConnection.onClose().doFinally(s -> keepAliveSupport.stop()).subscribe(); return keepAliveSupport .onSendKeepAliveFrame(onSendKeepAliveFrame) .onTimeout(onTimeout) diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index d6a0c9292..ca4f5dcb4 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -110,17 +110,9 @@ public ClientRSocketSession( position); } - return connectionTransformer - .apply(dc) - .doOnDiscard( - Tuple2.class, - tuple2 -> { - if (logger.isDebugEnabled()) { - logger.debug("try to reestablish from discard"); - } - tryReestablishSession(tuple2); - }); - }); + return connectionTransformer.apply(dc); + }) + .doOnDiscard(Tuple2.class, this::tryReestablishSession); this.resumableFramesStore = resumableFramesStore; this.allocator = resumableDuplexConnection.alloc(); this.resumeSessionDuration = resumeSessionDuration; diff --git a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java index b71693f0d..e23bc154b 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java @@ -118,7 +118,7 @@ public class InMemoryResumableFramesStore extends Flux * the {@link InMemoryResumableFramesStore#drain(long)} method. */ static final long MAX_WORK_IN_PROGRESS = - 0b0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_1111_1111_1111L; + 0b0000_0000_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111L; public InMemoryResumableFramesStore(String side, ByteBuf session, int cacheSizeBytes) { this.side = side; @@ -374,7 +374,7 @@ public void dispose() { return; } - drain(previousState | DISPOSED_FLAG); + drain((previousState + 1) | DISPOSED_FLAG); } void clearCache() { @@ -557,12 +557,13 @@ public void onNext(ByteBuf byteBuf) { return; } - if (isWorkInProgress(previousState) - || (!isConnected(previousState) && !hasPendingConnection(previousState))) { + if (isWorkInProgress(previousState)) { return; } - parent.drain(previousState + 1); + if (isConnected(previousState) || hasPendingConnection(previousState)) { + parent.drain((previousState + 1) | HAS_FRAME_FLAG); + } } @Override @@ -587,7 +588,7 @@ public void onError(Throwable t) { return; } - parent.drain(previousState | TERMINATED_FLAG); + parent.drain((previousState + 1) | TERMINATED_FLAG); } @Override @@ -609,7 +610,7 @@ public void onComplete() { return; } - parent.drain(previousState | TERMINATED_FLAG); + parent.drain((previousState + 1) | TERMINATED_FLAG); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 9704d9aba..c8811b9b3 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -322,6 +322,26 @@ static boolean isResumableFrame(ByteBuf frame) { return FrameHeaderCodec.streamId(frame) != 0; } + @Override + public String toString() { + return "ResumableDuplexConnection{" + + "side='" + + side + + '\'' + + ", session='" + + session + + '\'' + + ", remoteAddress=" + + remoteAddress + + ", state=" + + state + + ", activeConnection=" + + activeConnection + + ", connectionIndex=" + + connectionIndex + + '}'; + } + private static final class DisposedConnection implements DuplexConnection { static final DisposedConnection INSTANCE = new DisposedConnection(); diff --git a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java index 3c95fd65c..fa208269b 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java @@ -643,6 +643,8 @@ public static class ClientSocketRule extends AbstractSocketRule { protected Runnable delayer; protected Sinks.One producer; + protected Sinks.Empty thisClosedSink; + @Override protected void doInit() { super.doInit(); @@ -660,6 +662,7 @@ protected void doInit() { @Override protected RSocket newRSocket() { + this.thisClosedSink = Sinks.empty(); return new RSocketRequester( connection, PayloadDecoder.ZERO_COPY, @@ -671,7 +674,9 @@ protected RSocket newRSocket() { Integer.MAX_VALUE, null, __ -> null, - null); + null, + thisClosedSink, + thisClosedSink.asMono()); } } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java index a9c9ed9a5..aad0aaaca 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java @@ -91,6 +91,8 @@ class RSocketLeaseTest { private Sinks.Many leaseSender = Sinks.many().multicast().onBackpressureBuffer(); private RequesterLeaseTracker requesterLeaseTracker; + protected Sinks.Empty thisClosedSink; + protected Sinks.Empty otherClosedSink; @BeforeEach void setUp() { @@ -100,6 +102,8 @@ void setUp() { connection = new TestDuplexConnection(byteBufAllocator); requesterLeaseTracker = new RequesterLeaseTracker(TAG, 0); responderLeaseTracker = new ResponderLeaseTracker(TAG, connection, () -> leaseSender.asFlux()); + this.thisClosedSink = Sinks.empty(); + this.otherClosedSink = Sinks.empty(); ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection, new InitializingInterceptorRegistry(), true); @@ -115,7 +119,9 @@ void setUp() { 0, null, __ -> null, - requesterLeaseTracker); + requesterLeaseTracker, + thisClosedSink, + otherClosedSink.asMono().and(thisClosedSink.asMono())); mockRSocketHandler = mock(RSocket.class); when(mockRSocketHandler.metadataPush(any())) @@ -182,7 +188,8 @@ protected void hookOnError(Throwable throwable) { 0, FRAME_LENGTH_MASK, Integer.MAX_VALUE, - __ -> null); + __ -> null, + otherClosedSink); } @Test diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java index 25d91b25b..d736ae190 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java @@ -44,6 +44,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.test.util.RaceTestUtils; class RSocketRequesterSubscribersTest { @@ -60,11 +61,15 @@ class RSocketRequesterSubscribersTest { private LeaksTrackingByteBufAllocator allocator; private RSocket rSocketRequester; private TestDuplexConnection connection; + protected Sinks.Empty thisClosedSink; + protected Sinks.Empty otherClosedSink; @BeforeEach void setUp() { allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); connection = new TestDuplexConnection(allocator); + this.thisClosedSink = Sinks.empty(); + this.otherClosedSink = Sinks.empty(); rSocketRequester = new RSocketRequester( connection, @@ -77,7 +82,9 @@ void setUp() { 0, null, __ -> null, - null); + null, + thisClosedSink, + otherClosedSink.asMono().and(thisClosedSink.asMono())); } @ParameterizedTest diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 183785d2f..5861a459a 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -1453,8 +1453,14 @@ public void testWorkaround959(String type) { } public static class ClientSocketRule extends AbstractSocketRule { + + protected Sinks.Empty thisClosedSink; + protected Sinks.Empty otherClosedSink; + @Override protected RSocketRequester newRSocket() { + this.thisClosedSink = Sinks.empty(); + this.otherClosedSink = Sinks.empty(); return new RSocketRequester( connection, PayloadDecoder.ZERO_COPY, @@ -1466,7 +1472,9 @@ protected RSocketRequester newRSocket() { Integer.MAX_VALUE, null, (__) -> null, - null); + null, + thisClosedSink, + otherClosedSink.asMono().and(thisClosedSink.asMono())); } public int getStreamIdForRequestType(FrameType expectedFrameType) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index c0f64469c..cbfc05ea3 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -1184,6 +1184,7 @@ public static class ServerSocketRule extends AbstractSocketRule onCloseSink; @Override protected void doInit() { @@ -1220,6 +1221,7 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) { @Override protected RSocketResponder newRSocket() { + onCloseSink = Sinks.empty(); return new RSocketResponder( connection, acceptingSocket, @@ -1228,7 +1230,8 @@ protected RSocketResponder newRSocket() { 0, maxFrameLength, maxInboundPayloadSize, - __ -> requestInterceptor); + __ -> requestInterceptor, + onCloseSink); } private void sendRequest(int streamId, FrameType frameType) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index c9904d583..98cc94087 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -509,6 +509,8 @@ public static class SocketRule { private RSocket requestAcceptor; private LeaksTrackingByteBufAllocator allocator; + protected Sinks.Empty thisClosedSink; + protected Sinks.Empty otherClosedSink; public LeaksTrackingByteBufAllocator alloc() { return allocator; @@ -519,6 +521,9 @@ public void init() { serverProcessor = Sinks.many().multicast().directBestEffort(); clientProcessor = Sinks.many().multicast().directBestEffort(); + this.thisClosedSink = Sinks.empty(); + this.otherClosedSink = Sinks.empty(); + LocalDuplexConnection serverConnection = new LocalDuplexConnection("server", allocator, clientProcessor, serverProcessor); LocalDuplexConnection clientConnection = @@ -566,7 +571,8 @@ public Flux requestChannel(Publisher payloads) { 0, FRAME_LENGTH_MASK, Integer.MAX_VALUE, - __ -> null); + __ -> null, + otherClosedSink); crs = new RSocketRequester( @@ -580,7 +586,9 @@ public Flux requestChannel(Publisher payloads) { 0, null, __ -> null, - null); + null, + thisClosedSink, + otherClosedSink.asMono().and(thisClosedSink.asMono())); } public void setRequestAcceptor(RSocket requestAcceptor) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java index 44ff78a64..e85c5856e 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java @@ -69,6 +69,8 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); TestDuplexConnection conn = new TestDuplexConnection(allocator); + Sinks.Empty onThisSideClosedSink = Sinks.empty(); + RSocketRequester rSocket = new RSocketRequester( conn, @@ -81,7 +83,9 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { 0, null, __ -> null, - null); + null, + onThisSideClosedSink, + onThisSideClosedSink.asMono()); String errorMsg = "error"; @@ -107,6 +111,7 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() { LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); TestDuplexConnection conn = new TestDuplexConnection(allocator); + Sinks.Empty onThisSideClosedSink = Sinks.empty(); RSocketRequester rSocket = new RSocketRequester( conn, @@ -119,7 +124,9 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() { 0, null, __ -> null, - null); + null, + onThisSideClosedSink, + onThisSideClosedSink.asMono()); conn.addToReceivedBuffer( ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new RejectedSetupException("error"))); diff --git a/rsocket-examples/build.gradle b/rsocket-examples/build.gradle index 423acec79..a49570776 100644 --- a/rsocket-examples/build.gradle +++ b/rsocket-examples/build.gradle @@ -28,7 +28,6 @@ dependencies { implementation "io.micrometer:micrometer-core" implementation "io.micrometer:micrometer-tracing" implementation project(":rsocket-micrometer") - testImplementation 'org.awaitility:awaitility' runtimeOnly 'ch.qos.logback:logback-classic' @@ -37,6 +36,7 @@ dependencies { testImplementation 'org.mockito:mockito-core' testImplementation 'org.assertj:assertj-core' testImplementation 'io.projectreactor:reactor-test' + testImplementation 'org.awaitility:awaitility' testImplementation "io.micrometer:micrometer-test" testImplementation "io.micrometer:micrometer-tracing-integration-test" diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index 113b7a2f8..1b3779e85 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -79,10 +79,10 @@ public Mono connect() { Sinks.One inSink = Sinks.one(); Sinks.One outSink = Sinks.one(); - UnboundedProcessor in = new UnboundedProcessor(() -> inSink.tryEmitValue(inSink)); - UnboundedProcessor out = new UnboundedProcessor(() -> outSink.tryEmitValue(outSink)); + UnboundedProcessor in = new UnboundedProcessor(inSink::tryEmitEmpty); + UnboundedProcessor out = new UnboundedProcessor(outSink::tryEmitEmpty); - Mono onClose = inSink.asMono().zipWith(outSink.asMono()).then(); + Mono onClose = inSink.asMono().and(outSink.asMono()); server.apply(new LocalDuplexConnection(name, allocator, out, in, onClose)).subscribe(); diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index 08fd780dc..c1d0fd2a3 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -36,7 +36,7 @@ final class LocalDuplexConnection implements DuplexConnection { private final LocalSocketAddress address; private final ByteBufAllocator allocator; - private final Flux in; + private final UnboundedProcessor in; private final Mono onClose; @@ -54,7 +54,7 @@ final class LocalDuplexConnection implements DuplexConnection { LocalDuplexConnection( String name, ByteBufAllocator allocator, - Flux in, + UnboundedProcessor in, UnboundedProcessor out, Mono onClose) { this.address = new LocalSocketAddress(name); @@ -111,6 +111,11 @@ public SocketAddress remoteAddress() { return address; } + @Override + public String toString() { + return "LocalDuplexConnection{" + "address=" + address + "hash=" + hashCode() + '}'; + } + static class ByteBufReleaserOperator implements CoreSubscriber, Subscription, Fuseable.QueueSubscription { diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index 0445f5c02..f5d36269c 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -31,7 +31,7 @@ /** An implementation of {@link DuplexConnection} that connects via TCP. */ public final class TcpDuplexConnection extends BaseDuplexConnection { - + private final String side; private final Connection connection; /** @@ -40,14 +40,19 @@ public final class TcpDuplexConnection extends BaseDuplexConnection { * @param connection the {@link Connection} for managing the server */ public TcpDuplexConnection(Connection connection) { + this("unknown", connection); + } + + /** + * Creates a new instance + * + * @param connection the {@link Connection} for managing the server + */ + public TcpDuplexConnection(String side, Connection connection) { this.connection = Objects.requireNonNull(connection, "connection must not be null"); + this.side = side; - connection - .outbound() - .send(sender.hide()) - .then() - .doFinally(__ -> connection.dispose()) - .subscribe(); + connection.outbound().send(sender).then().doFinally(__ -> connection.dispose()).subscribe(); } @Override @@ -85,4 +90,9 @@ public Flux receive() { public void sendFrame(int streamId, ByteBuf frame) { super.sendFrame(streamId, FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame)); } + + @Override + public String toString() { + return "TcpDuplexConnection{" + "side='" + side + '\'' + ", connection=" + connection + '}'; + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 9deef6030..8f1170c5b 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -36,7 +36,7 @@ * stitched back on for frames received. */ public final class WebsocketDuplexConnection extends BaseDuplexConnection { - + private final String side; private final Connection connection; /** @@ -45,11 +45,21 @@ public final class WebsocketDuplexConnection extends BaseDuplexConnection { * @param connection the {@link Connection} to for managing the server */ public WebsocketDuplexConnection(Connection connection) { + this("unknown", connection); + } + + /** + * Creates a new instance + * + * @param connection the {@link Connection} to for managing the server + */ + public WebsocketDuplexConnection(String side, Connection connection) { this.connection = Objects.requireNonNull(connection, "connection must not be null"); + this.side = side; connection .outbound() - .sendObject(sender.map(BinaryWebSocketFrame::new).hide()) + .sendObject(sender.map(BinaryWebSocketFrame::new)) .then() .doFinally(__ -> connection.dispose()) .subscribe(); @@ -85,4 +95,15 @@ public void sendErrorAndClose(RSocketErrorException e) { final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e); sender.tryEmitFinal(errorFrame); } + + @Override + public String toString() { + return "WebsocketDuplexConnection{" + + "side='" + + side + + '\'' + + ", connection=" + + connection + + '}'; + } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java index f64c6063c..84214b98c 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java @@ -116,6 +116,6 @@ public Mono connect() { return client .doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec(maxFrameLength))) .connect() - .map(TcpDuplexConnection::new); + .map(connection -> new TcpDuplexConnection("client", connection)); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java index fe66da50a..86be47893 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java @@ -172,6 +172,6 @@ public Mono connect() { .websocket(specBuilder.build()) .uri(path) .connect() - .map(WebsocketDuplexConnection::new); + .map(connection -> new WebsocketDuplexConnection("client", connection)); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java index effc7bed5..32562c4a4 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java @@ -114,7 +114,7 @@ public Mono start(ConnectionAcceptor acceptor) { c -> { c.addHandlerLast(new RSocketLengthCodec(maxFrameLength)); acceptor - .apply(new TcpDuplexConnection(c)) + .apply(new TcpDuplexConnection("server", c)) .then(Mono.never()) .subscribe(c.disposeSubscriber()); }) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java index 38344c472..db13720e7 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java @@ -80,6 +80,8 @@ public Mono start(ConnectionAcceptor acceptor) { public static BiFunction> newHandler( ConnectionAcceptor acceptor) { return (in, out) -> - acceptor.apply(new WebsocketDuplexConnection((Connection) in)).then(out.neverComplete()); + acceptor + .apply(new WebsocketDuplexConnection("server", (Connection) in)) + .then(out.neverComplete()); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java index 81ac8dcb6..4fe736fad 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java @@ -117,7 +117,7 @@ public Mono start(ConnectionAcceptor acceptor) { return response.sendWebsocket( (in, out) -> acceptor - .apply(new WebsocketDuplexConnection((Connection) in)) + .apply(new WebsocketDuplexConnection("server", (Connection) in)) .then(out.neverComplete()), specBuilder.build()); })