-
Notifications
You must be signed in to change notification settings - Fork 3.9k
11246 :: Unexpected error when server expands a compressed message to learn it is too large #12360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
… learn it is too large
… learn it is too large
… learn it is too large
if (t instanceof StatusRuntimeException) { | ||
if (((StatusRuntimeException) t).getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
statusToPropagate = ((StatusRuntimeException) t).getStatus().withCause(t); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be refactored to use a single if. But, personally I would extract out the explicit conversion of t into a variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @AgraVator . I've addressed the comments.
ServerStreamListener mockListener = mock(ServerStreamListener.class); | ||
listener.setListener(mockListener); | ||
|
||
RuntimeException expectedT = new RuntimeException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectedT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AgraVator , The new test cases are inspired by the existing junit's like onReady_runtimeExceptionCancelsCall()/halfClosed_runtimeExceptionCancelsCall , following the same naming format. Please advise if you would prefer to change the variable to expected if We suspect it's a typo or if the current naming was intentional.
if (((StatusRuntimeException) t).getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
statusToPropagate = ((StatusRuntimeException) t).getStatus().withCause(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there no other cases where we can get "RESOURCE_EXHAUSTED" ? As, this is not tied specifically to marshaling the intended behavior can change if and when new "RESOURCE_EXHAUSTED" are added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AgraVator , I've addressed the review comments and changed the logic to handle both StatusRuntimeException and StatusException , I also maintained the existing exception propagation as same for other status code except RESOURCE_EXHAUSTED, Please review and share your feedback.
} else if (t instanceof StatusException) { | ||
extractedStatus = ((StatusException) t).getStatus(); | ||
} | ||
if (extractedStatus != null && extractedStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, no. We don't know the source of this status, and it could have trivially come from an outgoing request (where the server acts like a client to another backend). There's nothing inherently special about RESOURCE_EXHAUSTED that it should be propagated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since MessageDeframer.read
already declares throws IOException
can we make MessageDeframer.verifySize
just throw an IOException with message "Decompressed gRPC message exceeds maximum size " + maxMessageSize, and in ServerImpl.internalClose
check for this exception type and error message prefix and only in that case **formulate and ** send Status RESOURCE_EXHAUSED to the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IOException
is wrapped as InvalidProtocolBufferException
, which makes it harder to identify
Simpler with
- new
class TooLongDecompressedMessageException extends RuntimeException
- separate catch in
ServerImpl.JumpToApplicationThreadServerStreamListener#messagesAvailable()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ejona86 , @kannanjgithub and @panchenko I've followed the approach of retrieving the message from the Throwable instance and propagating the exception cause if it matches the RESOURCE_EXHAUSTED message like "Decompressed gRPC message exceeds maximum size." Please review the changes once you get some bandwidth and share your thoughts.
String message = t.getMessage();
if (extractedStatus != null && extractedStatus.getCode() == Status.Code.RESOURCE_EXHAUSTED
&& message != null
&& message.contains("Decompressed gRPC message exceeds maximum size")) {
statusToPropagate = extractedStatus.withCause(t);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
diff --git a/core/src/main/java/io/grpc/internal/MessageDeframer.java b/core/src/main/java/io/grpc/internal/MessageDeframer.java
index 13a01efec..7fa664c6c 100644
--- a/core/src/main/java/io/grpc/internal/MessageDeframer.java
+++ b/core/src/main/java/io/grpc/internal/MessageDeframer.java
@@ -519,13 +520,19 @@ public class MessageDeframer implements Closeable, Deframer {
private void verifySize() {
if (count > maxMessageSize) {
- throw Status.RESOURCE_EXHAUSTED
- .withDescription("Decompressed gRPC message exceeds maximum size " + maxMessageSize)
- .asRuntimeException();
+ throw new TooLongDecompressedMessageException(
+ Status.RESOURCE_EXHAUSTED.withDescription(
+ "Decompressed gRPC message exceeds maximum size " + maxMessageSize));
}
}
}
+ static class TooLongDecompressedMessageException extends StatusRuntimeException {
+ TooLongDecompressedMessageException(Status status) {
+ super(status);
+ }
+ }
+
private static class SingleMessageProducer implements StreamListener.MessageProducer {
private InputStream message;
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index dc0709e1f..ffd37fd84 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -832,6 +833,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
PerfMark.attachTag(tag);
PerfMark.linkIn(link);
getListener().messagesAvailable(producer);
+ } catch (TooLongDecompressedMessageException e) {
+ stream.close(e.getStatus(), new Metadata());
} catch (Throwable t) {
internalClose(t);
throw t;
So it's also possible to control server-side logs (not logged without the last throw t;
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote this yesterday after panchenko's first comment (so I had read kannanjgithub's comment as well), but didn't have time to talk about ClientCallImpl's approach.
ServerImpl.JumpToApplicationThreadServerStreamListener.internalClose()
is processing any exception thrown from the application. I don't think we want to mix the exception handling for this with exceptions from the application.
That means we should be handling this is ServerCallImpl.ServerStreamListenerImpl.messagesAvailableInternal()
. Ideally, we wouldn't throw an exception to the application, and it looks like we can do that from ServerCallImpl.
I think we should do something like:
try {
parsed = call.method.parseRequest(message)
} catch (...) {
call.cancelled = true;
// or call stream.cancel() directly; but we'd need to call the serverCallTracer
handleInternalError(...);
return;
}
That will discard all further callbacks except the cancellation of the RPC.
However, the completion vs cancel at the end of the RPC could still be coming from the original RPC. We must not allow that, as that is essentially corruption (it is a dropped message, and we might actually complete the RPC with OK).
Instead, we need to copy the approach in ClientCallImpl with ClientStreamListenerImpl.exceptionStatus
, where we record the error and when the RPC death callback arrives then we replace the status code with the one stored (and an empty Metadata).
The only detail we have to preserve is completed vs cancelled. Since completed only comes after a graceful close(), there's no races this specific code needs to deal with. (Even if the application had already called close(), that'd be fine because the transport will know what the final result is, and the close() from the application clearly didn't depend on the message we've thrown away.)
Fixes : #11246