Skip to content

Restore old behavior of NettyAdaptiveCumulator, but avoid using that class if Netty is on version 4.1.111 or later #11367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.Internal;
Expand All @@ -27,6 +28,7 @@
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.Version;
import javax.annotation.Nullable;

/**
Expand All @@ -41,6 +43,30 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
@Nullable
protected final ChannelPromise channelUnused;
private final ChannelLogger negotiationLogger;
private static final boolean usingPre4_1_111_Netty;

static {
// Netty 4.1.111 introduced a change in the behavior of duplicate() method
// that breaks the assumption of the cumulator. We need to detect this version
// and adjust the behavior accordingly.

boolean identifiedOldVersion = false;
try {
Version version = Version.identify().get("netty-buffer");
if (version != null) {
String[] split = version.artifactVersion().split("\\.");
if (split.length >= 3
&& Integer.parseInt(split[0]) == 4
&& Integer.parseInt(split[1]) <= 1
&& Integer.parseInt(split[2]) < 111) {
identifiedOldVersion = true;
}
}
} catch (Exception e) {
// Ignore, we'll assume it's a new version.
}
usingPre4_1_111_Netty = identifiedOldVersion;
}

protected GrpcHttp2ConnectionHandler(
ChannelPromise channelUnused,
Expand All @@ -51,7 +77,16 @@ protected GrpcHttp2ConnectionHandler(
super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
this.negotiationLogger = negotiationLogger;
setCumulator(ADAPTIVE_CUMULATOR);
if (usingPre4_1_111_Netty()) {
// We need to use the adaptive cumulator only if we're using a version of Netty that
// doesn't have the behavior that breaks it.
setCumulator(ADAPTIVE_CUMULATOR);
}
}

@VisibleForTesting
static boolean usingPre4_1_111_Netty() {
return usingPre4_1_111_Netty;
}

/**
Expand Down
50 changes: 26 additions & 24 deletions netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@
/**
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
* compose strategies.
* <br><br>
*
* <p><b><font color="red">Avoid using</font></b>
* {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
* to corruption, where the components' readable area are not equal to the Composite's capacity
* (see https://github.com/netty/netty/issues/12844).
*/

class NettyAdaptiveCumulator implements Cumulator {
Expand Down Expand Up @@ -95,7 +89,8 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
composite.capacity(composite.writerIndex());
}
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation);
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
.addFlattenedComponents(true, cumulation);
}
addInput(alloc, composite, in);
in = null;
Expand All @@ -115,7 +110,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu
@VisibleForTesting
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
if (shouldCompose(composite, in, composeMinSize)) {
composite.addComponent(true, in);
composite.addFlattenedComponents(true, in);
} else {
// The total size of the new data and the last component are below the threshold. Merge them.
mergeWithCompositeTail(alloc, composite, in);
Expand Down Expand Up @@ -161,13 +156,32 @@ static void mergeWithCompositeTail(
ByteBuf tail = composite.component(tailComponentIndex);
ByteBuf newTail = null;
try {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
&& !isCompositeOrWrappedComposite(tail)) {
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.

// Take ownership of the tail.
newTail = tail.retain();

// TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
// the issue fixed.
// In certain cases, removing the CompositeByteBuf component, and then adding it back
// isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
// This happens because the buffer returned by composite.component() has out-of-sync
// indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
// buffer, but doesn't set the indexes.
//
// To get the right indexes we use the fact that composite.internalComponent() returns
// the slice() into the readable portion of the underlying buffer.
// We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
// and combine it with the fact that SlicedByteBuf duplicates have their indexes
// adjusted so they correspond to the to the readable portion of the slice.
//
// Hence composite.internalComponent().duplicate() returns a buffer with the
// indexes that should've been on the composite.component() in the first place.
// Until the issue is fixed, we manually adjust the indexes of the removed component.
ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());

/*
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
*
Expand All @@ -183,11 +197,7 @@ static void mergeWithCompositeTail(
newTail.writeBytes(in);

} else {
// The tail satisfies one or more criteria:
// - Shared
// - Not expandable
// - Composite
// - Wrapped Composite
// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
newTail.setBytes(0, composite, tailStart, tailSize)
.setBytes(tailSize, in, in.readerIndex(), inputSize)
Expand All @@ -200,7 +210,7 @@ static void mergeWithCompositeTail(
// Remove the old tail, reset writer index.
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
// Add back the new tail.
composite.addComponent(true, newTail);
composite.addFlattenedComponents(true, newTail);
// New tail's ownership transferred to the composite buf.
newTail = null;
composite.readerIndex(prevReader);
Expand All @@ -215,12 +225,4 @@ static void mergeWithCompositeTail(
}
}
}

private static boolean isCompositeOrWrappedComposite(ByteBuf tail) {
ByteBuf cur = tail;
while (cur.unwrap() != null) {
cur = cur.unwrap();
}
return cur instanceof CompositeByteBuf;
}
}
65 changes: 30 additions & 35 deletions netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {
@Override
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
// To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
composite.addComponent(true, in);
composite.addFlattenedComponents(true, in);
}
};

Expand Down Expand Up @@ -122,7 +122,7 @@ public void cumulate_contiguousCumulation_newCompositeFromContiguousAndInput() {

@Test
public void cumulate_compositeCumulation_inputAppendedAsANewComponent() {
CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous);
CompositeByteBuf composite = alloc.compositeBuffer().addFlattenedComponents(true, contiguous);
assertSame(composite, cumulator.cumulate(alloc, composite, in));
assertEquals(DATA_INITIAL, composite.component(0).toString(US_ASCII));
assertEquals(DATA_INCOMING, composite.component(1).toString(US_ASCII));
Expand All @@ -136,7 +136,7 @@ public void cumulate_compositeCumulation_inputAppendedAsANewComponent() {

@Test
public void cumulate_compositeCumulation_inputReleasedOnError() {
CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous);
CompositeByteBuf composite = alloc.compositeBuffer().addFlattenedComponents(true, contiguous);
try {
throwingCumulator.cumulate(alloc, composite, in);
fail("Cumulator didn't throw");
Expand Down Expand Up @@ -208,8 +208,8 @@ public void setUp() {
in = ByteBufUtil.writeAscii(alloc, inData);
tail = ByteBufUtil.writeAscii(alloc, tailData);
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
// Note that addComponent() will not add a new component when tail is not readable.
composite.addComponent(true, tail);
// Note that addFlattenedComponents() will not add a new component when tail is not readable.
composite.addFlattenedComponents(true, tail);
}

@After
Expand Down Expand Up @@ -345,7 +345,7 @@ public void mergeWithCompositeTail_tailExpandable_write() {
assertThat(in.readableBytes()).isAtMost(tail.writableBytes());

// All fits, so tail capacity must stay the same.
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
}

Expand All @@ -362,7 +362,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() {
alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);

// Tail capacity is extended to its fast capacity.
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
}

Expand All @@ -372,7 +372,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
@SuppressWarnings("InlineMeInliner") // Requires Java 11
String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length();
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);

// Make input larger than tailFastCapacity
in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
Expand All @@ -386,6 +386,9 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
}

private void assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity) {
if (!GrpcHttp2ConnectionHandler.usingPre4_1_111_Netty()) {
return; // Netty 4.1.111 doesn't work with NettyAdaptiveCumulator
}
int originalNumComponents = composite.numComponents();

// Handle the case when reader index is beyond all readable bytes of the cumulation.
Expand Down Expand Up @@ -435,21 +438,21 @@ public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() {
@SuppressWarnings("InlineMeInliner") // Requires Java 11
String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailReplaced();
}

@Test
public void mergeWithCompositeTail_tailNotExpandable_shared() {
tail.retain();
composite.addComponent(true, tail);
composite.addFlattenedComponents(true, tail);
assertTailReplaced();
tail.release();
}

@Test
public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
composite.addComponent(true, tail.asReadOnly());
composite.addFlattenedComponents(true, tail.asReadOnly());
assertTailReplaced();
}

Expand Down Expand Up @@ -527,7 +530,8 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail) {
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -560,7 +564,8 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() {
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
tail.asReadOnly()) {
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
ByteBuf buffer) {
throw expectedError;
}
};
Expand Down Expand Up @@ -614,16 +619,20 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));

// Start with a regular cumulation and add the buf as the only component.
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf);
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf);
// Read composite1 buf to the beginning of the numbers.
assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");

// Wrap composite1 into another cumulation. This is similar to
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
CompositeByteBuf composite2 =
alloc.compositeBuffer(8).addComponent(true, composite1);
alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");

if (!GrpcHttp2ConnectionHandler.usingPre4_1_111_Netty()) {
return; // Netty 4.1.111 doesn't work with NettyAdaptiveCumulator
}

// The previous operation does not adjust the read indexes of the underlying buffers,
// only the internal Component offsets. When the cumulator attempts to append the input to
// the tail buffer, it extracts it from the cumulation, writes to it, and then adds it back.
Expand All @@ -637,27 +646,13 @@ public void mergeWithCompositeTail_outOfSyncComposite() {
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
ByteBufUtil.writeAscii(alloc, "56789"));
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
}

@Test
public void mergeWithNonCompositeTail() {
NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024);
ByteBufAllocator alloc = new PooledByteBufAllocator();
ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII));
ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII));

CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf);

CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in);

assertEquals("tail-012345", cumulation.toString(US_ASCII));
assertEquals(0, in.refCnt());
assertEquals(1, cumulation.numComponents());

buf.setByte(2, '*').setByte(7, '$');
assertEquals("ta*l-01$345", cumulation.toString(US_ASCII));

composite.release();
// Correctness check: we still have a single component, and this component is still the
// original underlying buffer.
assertThat(cumulation.numComponents()).isEqualTo(1);
// Replace '2' with '*', and '8' with '$'.
buf.setByte(5, '*').setByte(11, '$');
assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
}
}
}
Loading