2121import io .netty .channel .ChannelInboundHandlerAdapter ;
2222import io .netty .channel .embedded .EmbeddedChannel ;
2323import io .netty .util .ReferenceCountUtil ;
24- import org .junit .Assert ;
2524import org .junit .Test ;
2625
2726import java .util .List ;
2827import java .util .concurrent .BlockingQueue ;
2928import java .util .concurrent .LinkedBlockingDeque ;
3029
30+ import static org .junit .Assert .*;
31+
3132public class ByteToMessageDecoderTest {
3233
3334 @ Test
@@ -37,7 +38,7 @@ public void testRemoveItself() {
3738
3839 @ Override
3940 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
40- Assert . assertFalse (removed );
41+ assertFalse (removed );
4142 in .readByte ();
4243 ctx .pipeline ().remove (this );
4344 removed = true ;
@@ -47,20 +48,20 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
4748 ByteBuf buf = Unpooled .wrappedBuffer (new byte [] {'a' , 'b' , 'c' });
4849 channel .writeInbound (buf .copy ());
4950 ByteBuf b = channel .readInbound ();
50- Assert . assertEquals (b , buf .skipBytes (1 ));
51+ assertEquals (b , buf .skipBytes (1 ));
5152 b .release ();
5253 buf .release ();
5354 }
5455
5556 @ Test
5657 public void testRemoveItselfWriteBuffer () {
57- final ByteBuf buf = Unpooled .buffer ().writeBytes (new byte []{'a' , 'b' , 'c' });
58+ final ByteBuf buf = Unpooled .buffer ().writeBytes (new byte [] {'a' , 'b' , 'c' });
5859 EmbeddedChannel channel = new EmbeddedChannel (new ByteToMessageDecoder () {
5960 private boolean removed ;
6061
6162 @ Override
6263 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
63- Assert . assertFalse (removed );
64+ assertFalse (removed );
6465 in .readByte ();
6566 ctx .pipeline ().remove (this );
6667
@@ -71,8 +72,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
7172 });
7273
7374 channel .writeInbound (buf .copy ());
75+ ByteBuf expected = Unpooled .wrappedBuffer (new byte [] {'b' , 'c' });
7476 ByteBuf b = channel .readInbound ();
75- Assert .assertEquals (b , Unpooled .wrappedBuffer (new byte [] { 'b' , 'c' }));
77+ assertEquals (expected , b );
78+ expected .release ();
7679 buf .release ();
7780 b .release ();
7881 }
@@ -83,20 +86,20 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
8386 */
8487 @ Test
8588 public void testInternalBufferClearReadAll () {
86- final ByteBuf buf = ReferenceCountUtil .releaseLater (Unpooled .buffer ().writeBytes (new byte []{'a' }));
89+ final ByteBuf buf = ReferenceCountUtil .releaseLater (Unpooled .buffer ().writeBytes (new byte [] {'a' }));
8790 EmbeddedChannel channel = new EmbeddedChannel (new ByteToMessageDecoder () {
8891 @ Override
8992 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
9093 ByteBuf byteBuf = internalBuffer ();
91- Assert . assertEquals (1 , byteBuf .refCnt ());
94+ assertEquals (1 , byteBuf .refCnt ());
9295 in .readByte ();
9396 // Removal from pipeline should clear internal buffer
9497 ctx .pipeline ().remove (this );
95- Assert . assertEquals (0 , byteBuf .refCnt ());
98+ assertEquals (0 , byteBuf .refCnt ());
9699 }
97100 });
98- Assert . assertFalse (channel .writeInbound (buf ));
99- Assert . assertFalse (channel .finish ());
101+ assertFalse (channel .writeInbound (buf ));
102+ assertFalse (channel .finish ());
100103 }
101104
102105 /**
@@ -105,28 +108,32 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
105108 */
106109 @ Test
107110 public void testInternalBufferClearReadPartly () {
108- final ByteBuf buf = ReferenceCountUtil .releaseLater (Unpooled .buffer ().writeBytes (new byte []{'a' , 'b' }));
111+ final ByteBuf buf = ReferenceCountUtil .releaseLater (Unpooled .buffer ().writeBytes (new byte [] {'a' , 'b' }));
109112 EmbeddedChannel channel = new EmbeddedChannel (new ByteToMessageDecoder () {
110113 @ Override
111114 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
112115 ByteBuf byteBuf = internalBuffer ();
113- Assert . assertEquals (1 , byteBuf .refCnt ());
116+ assertEquals (1 , byteBuf .refCnt ());
114117 in .readByte ();
115118 // Removal from pipeline should clear internal buffer
116119 ctx .pipeline ().remove (this );
117- Assert . assertEquals (0 , byteBuf .refCnt ());
120+ assertEquals (0 , byteBuf .refCnt ());
118121 }
119122 });
120- Assert .assertTrue (channel .writeInbound (buf ));
121- Assert .assertTrue (channel .finish ());
122- Assert .assertEquals (channel .readInbound (), Unpooled .wrappedBuffer (new byte [] {'b' }));
123- Assert .assertNull (channel .readInbound ());
123+ assertTrue (channel .writeInbound (buf ));
124+ assertTrue (channel .finish ());
125+ ByteBuf expected = Unpooled .wrappedBuffer (new byte [] {'b' });
126+ ByteBuf b = channel .readInbound ();
127+ assertEquals (expected , b );
128+ assertNull (channel .readInbound ());
129+ expected .release ();
130+ b .release ();
124131 }
125132
126133 @ Test
127134 public void testFireChannelReadCompleteOnInactive () throws InterruptedException {
128135 final BlockingQueue <Integer > queue = new LinkedBlockingDeque <Integer >();
129- final ByteBuf buf = ReferenceCountUtil .releaseLater (Unpooled .buffer ().writeBytes (new byte []{'a' , 'b' }));
136+ final ByteBuf buf = ReferenceCountUtil .releaseLater (Unpooled .buffer ().writeBytes (new byte [] {'a' , 'b' }));
130137 EmbeddedChannel channel = new EmbeddedChannel (new ByteToMessageDecoder () {
131138 @ Override
132139 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
@@ -153,11 +160,43 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
153160 }
154161 }
155162 });
156- Assert . assertFalse (channel .writeInbound (buf ));
163+ assertFalse (channel .writeInbound (buf ));
157164 channel .finish ();
158- Assert .assertEquals (1 , (int ) queue .take ());
159- Assert .assertEquals (2 , (int ) queue .take ());
160- Assert .assertEquals (3 , (int ) queue .take ());
161- Assert .assertTrue (queue .isEmpty ());
165+ assertEquals (1 , (int ) queue .take ());
166+ assertEquals (2 , (int ) queue .take ());
167+ assertEquals (3 , (int ) queue .take ());
168+ assertTrue (queue .isEmpty ());
169+ }
170+
171+ // See https://github.com/netty/netty/issues/4635
172+ @ Test
173+ public void testRemoveWhileInCallDecode () {
174+ final Object upgradeMessage = new Object ();
175+ final ByteToMessageDecoder decoder = new ByteToMessageDecoder () {
176+ @ Override
177+ protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
178+ assertEquals ('a' , in .readByte ());
179+ out .add (upgradeMessage );
180+ }
181+ };
182+
183+ EmbeddedChannel channel = new EmbeddedChannel (decoder , new ChannelInboundHandlerAdapter () {
184+ @ Override
185+ public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception {
186+ if (msg == upgradeMessage ) {
187+ ctx .pipeline ().remove (decoder );
188+ return ;
189+ }
190+ ctx .fireChannelRead (msg );
191+ }
192+ });
193+
194+ ByteBuf buf = Unpooled .wrappedBuffer (new byte [] { 'a' , 'b' , 'c' });
195+ assertTrue (channel .writeInbound (buf .copy ()));
196+ ByteBuf b = channel .readInbound ();
197+ assertEquals (b , buf .skipBytes (1 ));
198+ assertFalse (channel .finish ());
199+ buf .release ();
200+ b .release ();
162201 }
163202}
0 commit comments