diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 31286c2bd7b..2d3d456b265 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -30,7 +30,9 @@
import static java.lang.Math.max;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
@@ -669,7 +671,17 @@ private void runInternal() {
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
exceptionThrown(
- Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
+ Status.CANCELLED
+ .withCause(t)
+ .withDescription(
+ "Failed to read message: "
+ + t
+ + ".\n\n***** Exception Trace: *****\n"
+ + Throwables.getStackTraceAsString(t)
+ + "***** END Exception Trace *****\n\n"
+ + "***** Thread Trace: *****\n"
+ + Joiner.on("\n").join(Thread.currentThread().getStackTrace())
+ + "\n***** END Thread Trace: *****\n\n"));
}
}
}
diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
index 25f4f9232cf..1b9b16c180c 100644
--- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
+++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
@@ -34,6 +34,9 @@
*/
@Internal
public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler {
+ protected static final int CUMULATOR_COMPOSE_MIN_SIZE = 1024;
+ public static final Cumulator ADAPTIVE_CUMULATOR =
+ new NettyAdaptiveCumulator(CUMULATOR_COMPOSE_MIN_SIZE);
@Nullable
protected final ChannelPromise channelUnused;
diff --git a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
new file mode 100644
index 00000000000..b27c402c64c
--- /dev/null
+++ b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
+import java.util.logging.Logger;
+
+class NettyAdaptiveCumulator implements io.netty.handler.codec.ByteToMessageDecoder.Cumulator {
+ private static final Logger logger = Logger.getLogger(NettyAdaptiveCumulator.class.getName());
+ private final int composeMinSize;
+
+ NettyAdaptiveCumulator(int composeMinSize) {
+ Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative");
+ this.composeMinSize = composeMinSize;
+ }
+
+ /**
+ * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
+ * compose strategies.
+ *
+ *
This cumulator applies a heuristic to make a decision whether to track a reference to the
+ * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into
+ * the last component (the tail) by performing a memory copy.
+ *
+ *
It is necessary as a protection from a potential attack on the {@link
+ * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case
+ * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator
+ * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra
+ * compute to read the cumulation.
+ *
+ *
Implemented heuristic establishes a minimal threshold for the total size of the tail and
+ * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is
+ * used to avoid a case where attacker alternates the size of data packets to trick the cumulator
+ * into always selecting compose strategy.
+ *
+ *
Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands
+ * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when
+ * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new
+ * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure
+ * runtime {@code O(n^2)} is amortized to {@code O(n)}.
+ */
+ @Override
+ @SuppressWarnings("ReferenceEquality")
+ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
+ if (!cumulation.isReadable()) {
+ cumulation.release();
+ logger.info("-------------------------- Return As Is --------------------------");
+ printBufDebug("INPUT AS IS", in);
+ logger.info("------------------------------------------------------------------");
+ return in;
+ }
+ CompositeByteBuf composite = null;
+ try {
+ if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
+ composite = (CompositeByteBuf) cumulation;
+ // Writer index must equal capacity if we are going to "write"
+ // new components to the end
+ if (composite.writerIndex() != composite.capacity()) {
+ logger.info("-------------------------- Adjust capacity --------------------------");
+ // printBufDebug("COMPOSITE ADJUST: INPUT", in);
+ printBufDebug("COMPOSITE ADJUST: PRE", composite);
+ composite.capacity(composite.writerIndex());
+ printBufDebug("COMPOSITE ADJUST: RESULT", composite);
+ logger.info("----------------------------------------------------------------------");
+ }
+ } else {
+ logger.info("-------------------------- Allocate new composite --------------------------");
+ // printBufDebug("ALLOC NEW: INPUT", in);
+ logger.warning("************ instanceof: " + (cumulation instanceof CompositeByteBuf) + "************");
+ logger.warning("************ refCnt: " + (cumulation.refCnt()) + "************");
+ logger.warning("************ instanceof && refCnt: "
+ + (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) + " ************");
+ logger.warning("************ (instanceof) && refCnt: "
+ + ((cumulation instanceof CompositeByteBuf) && cumulation.refCnt() == 1) + " ************");
+ logger.warning("************ (instanceof) && (refCnt): "
+ + ((cumulation instanceof CompositeByteBuf) && (cumulation.refCnt() == 1)) + " ************");
+ printBufDebug("ALLOC NEW COMPOSITE: CUMULATION", cumulation);
+ composite = alloc.compositeBuffer(Integer.MAX_VALUE)
+ .addFlattenedComponents(true, cumulation);
+ printBufDebug("ALLOC NEW COMPOSITE: RESULT", composite);
+ logger.info("----------------------------------------------------------------------------");
+ }
+ addInput(alloc, composite, in);
+ in = null;
+ return composite;
+ } finally {
+ if (in != null) {
+ // We must release if the ownership was not transferred as otherwise it may produce a leak
+ in.release();
+ // Also release any new buffer allocated if we're not returning it
+ if (composite != null && composite != cumulation) {
+ composite.release();
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
+ if (shouldCompose(composite, in, composeMinSize)) {
+ logger.info("+++++++++++++++++++++ Composing new component +++++++++++++++++++++");
+ printBufDebug("COMPOSITE ADD COMPONENT: INPUT", in);
+ composite.addFlattenedComponents(true, in);
+ printBufDebug("COMPOSITE ADD COMPONENT: RESULT", composite);
+ logger.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
+ } else {
+ // The total size of the new data and the last component are below the threshold. Merge them.
+ mergeWithCompositeTail(alloc, composite, in);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) {
+ int componentCount = composite.numComponents();
+ if (composite.numComponents() == 0) {
+ return true;
+ }
+ int tailSize = composite.capacity() - composite.toByteIndex(componentCount - 1);
+ return tailSize + in.readableBytes() >= composeMinSize;
+ }
+
+ /**
+ * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by
+ * expanding or replacing the tail component of the {@link CompositeByteBuf}.
+ *
+ *
The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying
+ * the tail component into a new buffer, for each incoming single-byte buffer. We append the new
+ * bytes to the tail, when a write (or a fast write) is possible.
+ *
+ *
Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to
+ * achieve runtime amortization.
+ *
+ *
We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)},
+ * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)},
+ * which doubles buffer capacity by normalizing it to the closest power of two. This assumption
+ * is verified in unit tests for this method.
+ */
+ @VisibleForTesting
+ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf composite,
+ ByteBuf in) {
+
+ int newBytes = in.readableBytes();
+ int tailIndex = composite.numComponents() - 1;
+ int tailStart = composite.toByteIndex(tailIndex);
+ int tailBytes = composite.writerIndex() - tailStart;
+ int totalBytes = newBytes + tailBytes;
+
+ ByteBuf tail = composite.component(tailIndex);
+ ByteBuf merged = null;
+
+ try {
+ if (tail.refCnt() == 1 && !tail.isReadOnly() && totalBytes <= tail.maxCapacity()) {
+ logger.info("========================= Extending Tail =========================");
+ printBufDebug("INPUT", in);
+ printBufDebug("COMPOSITE", composite);
+ printBufDebug("component()", composite.component(tailIndex));
+
+ ByteBuf ic = composite.internalComponent(tailIndex);
+ printBufDebug("internalComponent()", "IC", ic);
+ printBufDebug("IC.duplicate()", "IC_DUP", ic.duplicate());
+
+ // Ideal case: the tail isn't shared, and can be expanded to the required capacity.
+ // Take ownership of the tail.
+ merged = tail.retain();
+
+ /*
+ * The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
+ *
+ * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply
+ * updates its boundary to the end of consecutive memory run assigned to this buffer)
+ * - when the required size doesn't fit into writableBytes(), a new buffer is
+ * allocated, and the capacity calculated with alloc.calculateNewCapacity()
+ * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf
+ * is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer.
+ * Unwrapping buffers is unsafe, and potential benefit of fast writes may not be
+ * as pronounced because the capacity is doubled with each reallocation.
+ */
+ merged.writeBytes(in);
+ printBufDebug("MERGED", merged);
+
+ // Store readerIndex to avoid out of bounds writerIndex during component replacement.
+ int prevReader = composite.readerIndex();
+ // Remove the tail, reset writer index, add merged component.
+ composite.removeComponent(tailIndex);
+ composite.setIndex(0, tailStart);
+ composite.addFlattenedComponents(true, merged);
+ merged = null;
+ in.release();
+ in = null;
+ // Restore the reader.
+ composite.readerIndex(prevReader);
+
+ printBufDebug("COMPOSITE MERGE RESULT", composite);
+ logger.info("==================================================================\n\n");
+ } else {
+ logger.info("^^^^^^^^^^^^^^^^^^^^^^^^^^ Reallocating new tail ^^^^^^^^^^^^^^^^^^^^^^^^^^^");
+ printBufDebug("INPUT", in);
+ printBufDebug("COMPOSITE", composite);
+
+ // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
+ merged = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE));
+ merged.setBytes(0, composite, tailStart, tailBytes)
+ .setBytes(tailBytes, in, in.readerIndex(), newBytes)
+ .writerIndex(totalBytes);
+ printBufDebug("NEW MERGED", merged);
+
+ in.readerIndex(in.writerIndex());
+
+ // Store readerIndex to avoid out of bounds writerIndex during component replacement.
+ int prevReader = composite.readerIndex();
+ // Remove the tail, reset writer index, add merged component.
+ composite.removeComponent(tailIndex);
+ composite.setIndex(0, tailStart);
+ composite.addFlattenedComponents(true, merged);
+ merged = null;
+ in.release();
+ in = null;
+ // Restore the reader.
+ composite.readerIndex(prevReader);
+
+ printBufDebug("COMPOSITE REALOC RESULT", composite);
+ logger.info("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
+ }
+
+ } finally {
+ // Input buffer was merged with the tail.
+ if (in != null) {
+ in.release();
+ }
+ // If merge's ownership isn't transferred to the composite buf, release it to prevent a leak.
+ if (merged != null) {
+ merged.release();
+ }
+ }
+ }
+
+ private static void printBufDebug(String title, ByteBuf buf) {
+ printBufDebug(title, title, buf);
+ }
+
+ private static void printBufDebug(String title, String prefix, ByteBuf buf) {
+ String msg = "$$$$$$$$$$$$$$$$$$$$ " + title + " $$$$$$$$$$$$$$$$$$$$";
+ int len = msg.length();
+ msg += "\n";
+ msg += ByteBufUtil.prettyHexDump(buf, 0, buf.readerIndex() + buf.readableBytes()) + "\n";
+ msg += prefix + " " + buf + " refCnt=" + buf.refCnt() + "\n";
+ // msg += prefix + " " + buf.getClass().getName() + "\n";
+ if (buf instanceof CompositeByteBuf) {
+ CompositeByteBuf composite = (CompositeByteBuf) buf;
+ msg += composite.allComponentTypes(prefix);
+ } else {
+ // msg += CompositeByteBuf.getBufTypes(prefix, buf);
+ }
+ msg += Strings.repeat("$", len) + "\n";
+ logger.info(msg);
+ }
+
+ private static String getIndexes(ByteBuf buf) {
+ return "rix=0x" + Integer.toHexString(buf.readerIndex())
+ + " wix=0x" + Integer.toHexString(buf.writerIndex());
+ }
+}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 2fe4d65bfa1..6ce43d51f7b 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -264,6 +264,9 @@ private NettyClientHandler(
this.authority = authority;
this.attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
+ setCumulator(ADAPTIVE_CUMULATOR);
+ // setCumulator(COMPOSITE_CUMULATOR);
+ // setCumulator(MERGE_CUMULATOR);
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@@ -497,6 +500,12 @@ InternalChannelz.Security getSecurityInfo() {
protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
Http2Exception http2Ex) {
logger.log(Level.FINE, "Caught a connection error", cause);
+ if (http2Ex != null) {
+ logger.log(Level.WARNING, "#### http2 ex: " + http2Ex + " err:" + http2Ex.error() + " shut:"
+ + http2Ex.shutdownHint(), cause);
+ } else {
+ logger.log(Level.WARNING, "#### http2 ex: null", cause);
+ }
lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
// Parent class will shut down the Channel
super.onConnectionError(ctx, outbound, cause, http2Ex);
@@ -875,7 +884,7 @@ private Status statusFromH2Error(
if (statusCode == null) {
statusCode = status.getCode();
}
- String debugString = "";
+ String debugString = "statusFromH2Error() debugString: ";
if (debugData != null && debugData.length > 0) {
// If a debug message was provided, use it.
debugString = ", debug data: " + new String(debugData, UTF_8);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index fa21221a0ae..0ccb06a049f 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -280,6 +280,7 @@ private NettyServerHandler(
Attributes eagAttributes) {
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null);
+ // setCumulator(ADAPTIVE_CUMULATOR);
final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index fdff8567294..935a35badaf 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -25,7 +25,9 @@
import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import io.grpc.InternalChannelz;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
@@ -286,7 +288,17 @@ public static Status statusFromThrowable(Throwable t) {
return Status.UNAVAILABLE.withDescription("unresolved address").withCause(t);
}
if (t instanceof Http2Exception) {
- return Status.INTERNAL.withDescription("http2 exception").withCause(t);
+ return Status.INTERNAL
+ .withDescription(
+ "http2 exception: "
+ + t
+ + ".\n\n***** Exception Trace: *****\n"
+ + Throwables.getStackTraceAsString(t)
+ + "***** END Exception Trace *****\n\n"
+ + "***** Thread Trace: *****\n"
+ + Joiner.on("\n").join(Thread.currentThread().getStackTrace())
+ + "\n***** END Thread Trace: *****\n\n")
+ .withCause(t);
}
return s;
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java
new file mode 100644
index 00000000000..09cc88ff1f3
--- /dev/null
+++ b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java
@@ -0,0 +1,592 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static com.google.common.truth.TruthJUnit.assume;
+import static io.netty.util.CharsetUtil.US_ASCII;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.Collection;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Enclosed.class)
+public class NettyAdaptiveCumulatorTest {
+
+ private static Collection