@@ -94,64 +94,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
94
94
95
95
private static final int MAX_CONTENT_LENGTH = ByteSizeUnit .MB .toIntBytes (50 );
96
96
97
- private static long transportStatsRequestBytesSize (Ctx ctx ) {
98
- var httpTransport = internalCluster ().getInstance (HttpServerTransport .class , ctx .nodeName );
99
- var stats = httpTransport .stats ().clientStats ();
100
- var bytes = 0L ;
101
- for (var s : stats ) {
102
- bytes += s .requestSizeBytes ();
103
- }
104
- return bytes ;
105
- }
106
-
107
- static int MBytes (int m ) {
108
- return m * 1024 * 1024 ;
109
- }
110
-
111
- static <T > T safePoll (BlockingDeque <T > queue ) {
112
- try {
113
- var t = queue .poll (SAFE_AWAIT_TIMEOUT .seconds (), TimeUnit .SECONDS );
114
- assertNotNull ("queue is empty" , t );
115
- return t ;
116
- } catch (InterruptedException e ) {
117
- Thread .currentThread ().interrupt ();
118
- throw new AssertionError (e );
119
- }
120
- }
121
-
122
- private static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
123
- var req = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
124
- req .headers ().add (CONTENT_LENGTH , content .readableBytes ());
125
- req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
126
- req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
127
- return req ;
128
- }
129
-
130
- private static HttpRequest httpRequest (String opaqueId , int contentLength ) {
131
- return httpRequest (ControlServerRequestPlugin .ROUTE , opaqueId , contentLength );
132
- }
133
-
134
- private static HttpRequest httpRequest (String uri , String opaqueId , int contentLength ) {
135
- var req = new DefaultHttpRequest (HTTP_1_1 , POST , uri );
136
- req .headers ().add (CONTENT_LENGTH , contentLength );
137
- req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
138
- req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
139
- return req ;
140
- }
141
-
142
- private static HttpContent randomContent (int size , boolean isLast ) {
143
- var buf = Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
144
- if (isLast ) {
145
- return new DefaultLastHttpContent (buf );
146
- } else {
147
- return new DefaultHttpContent (buf );
148
- }
149
- }
150
-
151
- private static ByteBuf randomByteBuf (int size ) {
152
- return Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
153
- }
154
-
155
97
@ Override
156
98
protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
157
99
Settings .Builder builder = Settings .builder ().put (super .nodeSettings (nodeOrdinal , otherSettings ));
@@ -236,6 +178,8 @@ public void testClientConnectionCloseMidStream() throws Exception {
236
178
237
179
// await stream handler is ready and request full content
238
180
var handler = ctx .awaitRestChannelAccepted (opaqueId );
181
+ assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
182
+
239
183
assertFalse (handler .streamClosed );
240
184
241
185
// terminate client connection
@@ -246,7 +190,10 @@ public void testClientConnectionCloseMidStream() throws Exception {
246
190
handler .stream .next ();
247
191
248
192
// wait for resources to be released
249
- assertBusy (() -> assertTrue (handler .streamClosed ));
193
+ assertBusy (() -> {
194
+ assertEquals (0 , handler .stream .bufSize ());
195
+ assertTrue (handler .streamClosed );
196
+ });
250
197
}
251
198
}
252
199
@@ -261,11 +208,15 @@ public void testServerCloseConnectionMidStream() throws Exception {
261
208
262
209
// await stream handler is ready and request full content
263
210
var handler = ctx .awaitRestChannelAccepted (opaqueId );
211
+ assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
264
212
assertFalse (handler .streamClosed );
265
213
266
214
// terminate connection on server and wait resources are released
267
215
handler .channel .request ().getHttpChannel ().close ();
268
- assertBusy (() -> assertTrue (handler .streamClosed ));
216
+ assertBusy (() -> {
217
+ assertEquals (0 , handler .stream .bufSize ());
218
+ assertTrue (handler .streamClosed );
219
+ });
269
220
}
270
221
}
271
222
@@ -279,12 +230,16 @@ public void testServerExceptionMidStream() throws Exception {
279
230
280
231
// await stream handler is ready and request full content
281
232
var handler = ctx .awaitRestChannelAccepted (opaqueId );
233
+ assertBusy (() -> assertNotEquals (0 , handler .stream .bufSize ()));
282
234
assertFalse (handler .streamClosed );
283
235
284
236
handler .shouldThrowInsideHandleChunk = true ;
285
237
handler .stream .next ();
286
238
287
- assertBusy (() -> assertTrue (handler .streamClosed ));
239
+ assertBusy (() -> {
240
+ assertEquals (0 , handler .stream .bufSize ());
241
+ assertTrue (handler .streamClosed );
242
+ });
288
243
}
289
244
}
290
245
@@ -325,7 +280,7 @@ public void testClientBackpressure() throws Exception {
325
280
});
326
281
handler .readBytes (partSize );
327
282
}
328
- assertTrue (handler .recvLast );
283
+ assertTrue (handler .stream . hasLast () );
329
284
}
330
285
}
331
286
@@ -430,6 +385,16 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
430
385
}
431
386
}
432
387
388
+ private static long transportStatsRequestBytesSize (Ctx ctx ) {
389
+ var httpTransport = internalCluster ().getInstance (HttpServerTransport .class , ctx .nodeName );
390
+ var stats = httpTransport .stats ().clientStats ();
391
+ var bytes = 0L ;
392
+ for (var s : stats ) {
393
+ bytes += s .requestSizeBytes ();
394
+ }
395
+ return bytes ;
396
+ }
397
+
433
398
/**
434
399
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
435
400
*/
@@ -524,15 +489,63 @@ private String opaqueId(int reqNo) {
524
489
return getTestName () + "-" + reqNo ;
525
490
}
526
491
527
- private Ctx setupClientCtx () throws Exception {
492
+ static int MBytes (int m ) {
493
+ return m * 1024 * 1024 ;
494
+ }
495
+
496
+ static <T > T safePoll (BlockingDeque <T > queue ) {
497
+ try {
498
+ var t = queue .poll (SAFE_AWAIT_TIMEOUT .seconds (), TimeUnit .SECONDS );
499
+ assertNotNull ("queue is empty" , t );
500
+ return t ;
501
+ } catch (InterruptedException e ) {
502
+ Thread .currentThread ().interrupt ();
503
+ throw new AssertionError (e );
504
+ }
505
+ }
506
+
507
+ static FullHttpRequest fullHttpRequest (String opaqueId , ByteBuf content ) {
508
+ var req = new DefaultFullHttpRequest (HTTP_1_1 , POST , ControlServerRequestPlugin .ROUTE , Unpooled .wrappedBuffer (content ));
509
+ req .headers ().add (CONTENT_LENGTH , content .readableBytes ());
510
+ req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
511
+ req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
512
+ return req ;
513
+ }
514
+
515
+ static HttpRequest httpRequest (String opaqueId , int contentLength ) {
516
+ return httpRequest (ControlServerRequestPlugin .ROUTE , opaqueId , contentLength );
517
+ }
518
+
519
+ static HttpRequest httpRequest (String uri , String opaqueId , int contentLength ) {
520
+ var req = new DefaultHttpRequest (HTTP_1_1 , POST , uri );
521
+ req .headers ().add (CONTENT_LENGTH , contentLength );
522
+ req .headers ().add (CONTENT_TYPE , APPLICATION_JSON );
523
+ req .headers ().add (Task .X_OPAQUE_ID_HTTP_HEADER , opaqueId );
524
+ return req ;
525
+ }
526
+
527
+ static HttpContent randomContent (int size , boolean isLast ) {
528
+ var buf = Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
529
+ if (isLast ) {
530
+ return new DefaultLastHttpContent (buf );
531
+ } else {
532
+ return new DefaultHttpContent (buf );
533
+ }
534
+ }
535
+
536
+ static ByteBuf randomByteBuf (int size ) {
537
+ return Unpooled .wrappedBuffer (randomByteArrayOfLength (size ));
538
+ }
539
+
540
+ Ctx setupClientCtx () throws Exception {
528
541
var nodeName = internalCluster ().getRandomNodeName ();
529
542
var clientRespQueue = new LinkedBlockingDeque <>(16 );
530
543
var bootstrap = bootstrapClient (nodeName , clientRespQueue );
531
544
var channel = bootstrap .connect ().sync ().channel ();
532
545
return new Ctx (getTestName (), nodeName , bootstrap , channel , clientRespQueue );
533
546
}
534
547
535
- private Bootstrap bootstrapClient (String node , BlockingQueue <Object > queue ) {
548
+ Bootstrap bootstrapClient (String node , BlockingQueue <Object > queue ) {
536
549
var httpServer = internalCluster ().getInstance (HttpServerTransport .class , node );
537
550
var remoteAddr = randomFrom (httpServer .boundAddress ().boundAddresses ());
538
551
return new Bootstrap ().group (new NioEventLoopGroup (1 ))
@@ -570,13 +583,9 @@ protected boolean addMockHttpTransport() {
570
583
return false ; // enable http
571
584
}
572
585
573
- private record Ctx (
574
- String testName ,
575
- String nodeName ,
576
- Bootstrap clientBootstrap ,
577
- Channel clientChannel ,
578
- BlockingDeque <Object > clientRespQueue
579
- ) implements AutoCloseable {
586
+ record Ctx (String testName , String nodeName , Bootstrap clientBootstrap , Channel clientChannel , BlockingDeque <Object > clientRespQueue )
587
+ implements
588
+ AutoCloseable {
580
589
581
590
@ Override
582
591
public void close () throws Exception {
@@ -601,7 +610,7 @@ ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception
601
610
}
602
611
}
603
612
604
- private static class ServerRequestHandler implements BaseRestHandler .RequestBodyChunkConsumer {
613
+ static class ServerRequestHandler implements BaseRestHandler .RequestBodyChunkConsumer {
605
614
final SubscribableListener <Void > channelAccepted = new SubscribableListener <>();
606
615
final String opaqueId ;
607
616
final BlockingDeque <Chunk > recvChunks = new LinkedBlockingDeque <>();
0 commit comments