Skip to content

Commit e036b1b

Browse files
ryanpbrewsterejona86
authored andcommitted
netty: Allow deframer errors to close stream with a status code
Today, deframer errors cancel the stream without communicating a status code to the peer. This change causes deframer errors to trigger a best-effort attempt to send trailers with a status code so that the peer understands why the stream is being closed. Fixes #3996
1 parent 11612b4 commit e036b1b

File tree

5 files changed

+117
-10
lines changed

5 files changed

+117
-10
lines changed

netty/src/main/java/io/grpc/netty/CancelServerStreamCommand.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,23 @@
2727
final class CancelServerStreamCommand extends WriteQueue.AbstractQueuedCommand {
2828
private final NettyServerStream.TransportState stream;
2929
private final Status reason;
30+
private final PeerNotify peerNotify;
3031

31-
CancelServerStreamCommand(NettyServerStream.TransportState stream, Status reason) {
32+
private CancelServerStreamCommand(
33+
NettyServerStream.TransportState stream, Status reason, PeerNotify peerNotify) {
3234
this.stream = Preconditions.checkNotNull(stream, "stream");
3335
this.reason = Preconditions.checkNotNull(reason, "reason");
36+
this.peerNotify = Preconditions.checkNotNull(peerNotify, "peerNotify");
37+
}
38+
39+
static CancelServerStreamCommand withReset(
40+
NettyServerStream.TransportState stream, Status reason) {
41+
return new CancelServerStreamCommand(stream, reason, PeerNotify.RESET);
42+
}
43+
44+
static CancelServerStreamCommand withReason(
45+
NettyServerStream.TransportState stream, Status reason) {
46+
return new CancelServerStreamCommand(stream, reason, PeerNotify.BEST_EFFORT_STATUS);
3447
}
3548

3649
NettyServerStream.TransportState stream() {
@@ -41,6 +54,10 @@ Status reason() {
4154
return reason;
4255
}
4356

57+
boolean wantsHeaders() {
58+
return peerNotify == PeerNotify.BEST_EFFORT_STATUS;
59+
}
60+
4461
@Override
4562
public boolean equals(Object o) {
4663
if (this == o) {
@@ -68,4 +85,11 @@ public String toString() {
6885
.add("reason", reason)
6986
.toString();
7087
}
88+
89+
private enum PeerNotify {
90+
/** Notify the peer by sending a RST_STREAM with no other information. */
91+
RESET,
92+
/** Notify the peer about the {@link #reason} by sending structured headers, if possible. */
93+
BEST_EFFORT_STATUS,
94+
}
7195
}

netty/src/main/java/io/grpc/netty/NettyServerHandler.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -788,9 +788,37 @@ private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand c
788788
PerfMark.linkIn(cmd.getLink());
789789
// Notify the listener if we haven't already.
790790
cmd.stream().transportReportStatus(cmd.reason());
791-
// Terminate the stream.
792-
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
791+
792+
// Now we need to decide how we're going to notify the peer that this stream is closed.
793+
// If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
794+
// a structured headers frame.
795+
if (shouldCloseStreamWithHeaders(cmd, connection())) {
796+
Metadata md = new Metadata();
797+
md.put(InternalStatus.CODE_KEY, cmd.reason());
798+
if (cmd.reason().getDescription() != null) {
799+
md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
800+
}
801+
Http2Headers headers = Utils.convertServerHeaders(md);
802+
encoder().writeHeaders(
803+
ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
804+
} else {
805+
// Terminate the stream.
806+
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
807+
}
808+
}
809+
}
810+
811+
// Determine whether a CancelServerStreamCommand should try to close the stream with a
812+
// HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
813+
// configure cmd.wantsHeaders()). The state of the stream also has an influence: we
814+
// only try to send HEADERS if the stream exists and hasn't already sent any headers.
815+
private static boolean shouldCloseStreamWithHeaders(
816+
CancelServerStreamCommand cmd, Http2Connection conn) {
817+
if (!cmd.wantsHeaders()) {
818+
return false;
793819
}
820+
Http2Stream stream = conn.stream(cmd.stream().id());
821+
return stream != null && !stream.isHeadersSent();
794822
}
795823

796824
private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,

netty/src/main/java/io/grpc/netty/NettyServerStream.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
130130
@Override
131131
public void cancel(Status status) {
132132
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) {
133-
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
133+
writeQueue.enqueue(CancelServerStreamCommand.withReset(transportState(), status), true);
134134
}
135135
}
136136
}
@@ -189,7 +189,7 @@ public void deframeFailed(Throwable cause) {
189189
log.log(Level.WARNING, "Exception processing message", cause);
190190
Status status = Status.fromThrowable(cause);
191191
transportReportStatus(status);
192-
handler.getWriteQueue().enqueue(new CancelServerStreamCommand(this, status), true);
192+
handler.getWriteQueue().enqueue(CancelServerStreamCommand.withReason(this, status), true);
193193
}
194194

195195
private void onWriteFrameData(ChannelFuture future, int numMessages, int numBytes) {
@@ -222,7 +222,7 @@ private void handleWriteFutureFailures(ChannelFuture future) {
222222
*/
223223
protected void http2ProcessingFailed(Status status) {
224224
transportReportStatus(status);
225-
handler.getWriteQueue().enqueue(new CancelServerStreamCommand(this, status), true);
225+
handler.getWriteQueue().enqueue(CancelServerStreamCommand.withReset(this, status), true);
226226
}
227227

228228
void inboundDataReceived(ByteBuf frame, boolean endOfStream) {

netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,10 @@
8989
import java.io.InputStream;
9090
import java.nio.channels.ClosedChannelException;
9191
import java.util.Arrays;
92+
import java.util.HashMap;
9293
import java.util.LinkedList;
9394
import java.util.List;
95+
import java.util.Map;
9496
import java.util.Queue;
9597
import java.util.concurrent.TimeUnit;
9698
import org.junit.Before;
@@ -469,11 +471,41 @@ public void connectionWindowShouldBeOverridden() throws Exception {
469471
public void cancelShouldSendRstStream() throws Exception {
470472
manualSetUp();
471473
createStream();
472-
enqueue(new CancelServerStreamCommand(stream.transportState(), Status.DEADLINE_EXCEEDED));
474+
enqueue(CancelServerStreamCommand.withReset(stream.transportState(), Status.DEADLINE_EXCEEDED));
473475
verifyWrite().writeRstStream(eq(ctx()), eq(stream.transportState().id()),
474476
eq(Http2Error.CANCEL.code()), any(ChannelPromise.class));
475477
}
476478

479+
@Test
480+
public void cancelWithNotify_shouldSendHeaders() throws Exception {
481+
manualSetUp();
482+
createStream();
483+
484+
enqueue(CancelServerStreamCommand.withReason(
485+
stream.transportState(),
486+
Status.RESOURCE_EXHAUSTED.withDescription("my custom description")
487+
));
488+
489+
ArgumentCaptor<Http2Headers> captor = ArgumentCaptor.forClass(Http2Headers.class);
490+
verifyWrite()
491+
.writeHeaders(
492+
eq(ctx()),
493+
eq(STREAM_ID),
494+
captor.capture(),
495+
eq(0),
496+
eq(true),
497+
any(ChannelPromise.class));
498+
499+
// For arcane reasons, the specific implementation of Http2Headers here doesn't actually support
500+
// methods like `get(...)`, so we have to manually convert it into a map.
501+
Map<String, String> actualHeaders = new HashMap<>();
502+
for (Map.Entry<CharSequence, CharSequence> entry : captor.getValue()) {
503+
actualHeaders.put(entry.getKey().toString(), entry.getValue().toString());
504+
}
505+
assertEquals("8", actualHeaders.get(InternalStatus.CODE_KEY.name()));
506+
assertEquals("my custom description", actualHeaders.get(InternalStatus.MESSAGE_KEY.name()));
507+
}
508+
477509
@Test
478510
public void headersWithInvalidContentTypeShouldFail() throws Exception {
479511
manualSetUp();

netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020
import static com.google.common.truth.Truth.assertWithMessage;
21-
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
2221
import static io.grpc.netty.NettyTestUtil.messageFrame;
2322
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
2423
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
@@ -37,6 +36,7 @@
3736
import static org.mockito.Mockito.verifyNoMoreInteractions;
3837
import static org.mockito.Mockito.when;
3938

39+
import com.google.common.base.Strings;
4040
import com.google.common.collect.ImmutableListMultimap;
4141
import com.google.common.collect.ListMultimap;
4242
import io.grpc.Attributes;
@@ -73,6 +73,8 @@
7373
/** Unit tests for {@link NettyServerStream}. */
7474
@RunWith(JUnit4.class)
7575
public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream> {
76+
private static final int TEST_MAX_MESSAGE_SIZE = 128;
77+
7678
@Mock
7779
protected ServerStreamListener serverListener;
7880

@@ -380,18 +382,39 @@ public void emptyFramerShouldSendNoPayload() {
380382
public void cancelStreamShouldSucceed() {
381383
stream().cancel(Status.DEADLINE_EXCEEDED);
382384
verify(writeQueue).enqueue(
383-
new CancelServerStreamCommand(stream().transportState(), Status.DEADLINE_EXCEEDED),
385+
CancelServerStreamCommand.withReset(stream().transportState(), Status.DEADLINE_EXCEEDED),
384386
true);
385387
}
386388

389+
@Test
390+
public void oversizedMessagesResultInResourceExhaustedTrailers() throws Exception {
391+
@SuppressWarnings("InlineMeInliner") // Requires Java 11
392+
String oversizedMsg = Strings.repeat("a", TEST_MAX_MESSAGE_SIZE + 1);
393+
stream.request(1);
394+
stream.transportState().inboundDataReceived(messageFrame(oversizedMsg), false);
395+
assertNull("message should have caused a deframer error", listenerMessageQueue().poll());
396+
397+
ArgumentCaptor<CancelServerStreamCommand> cancelCmdCap =
398+
ArgumentCaptor.forClass(CancelServerStreamCommand.class);
399+
verify(writeQueue).enqueue(cancelCmdCap.capture(), eq(true));
400+
401+
Status status = Status.RESOURCE_EXHAUSTED
402+
.withDescription("gRPC message exceeds maximum size 128: 129");
403+
404+
CancelServerStreamCommand actualCmd = cancelCmdCap.getValue();
405+
assertThat(actualCmd.reason().getCode()).isEqualTo(status.getCode());
406+
assertThat(actualCmd.reason().getDescription()).isEqualTo(status.getDescription());
407+
assertThat(actualCmd.wantsHeaders()).isTrue();
408+
}
409+
387410
@Override
388411
@SuppressWarnings("DirectInvocationOnMock")
389412
protected NettyServerStream createStream() {
390413
when(handler.getWriteQueue()).thenReturn(writeQueue);
391414
StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP;
392415
TransportTracer transportTracer = new TransportTracer();
393416
NettyServerStream.TransportState state = new NettyServerStream.TransportState(
394-
handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx,
417+
handler, channel.eventLoop(), http2Stream, TEST_MAX_MESSAGE_SIZE, statsTraceCtx,
395418
transportTracer, "method");
396419
NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY,
397420
"test-authority", statsTraceCtx);

0 commit comments

Comments
 (0)