Skip to content

Commit 6cd7e71

Browse files
committed
UNDERTOW-1382: Tests to reproduce this issue
It's much more likely that we will hit this issue when the client is rate limited. Added tests for servlet 3.1 async io with a RequestBufferingHandler which seems to cause the issue reproducibly.
1 parent 4d808be commit 6cd7e71

File tree

3 files changed

+386
-197
lines changed

3 files changed

+386
-197
lines changed
Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
/*
2+
* JBoss, Home of Professional Open Source.
3+
* Copyright 2014 Red Hat, Inc., and individual contributors
4+
* as indicated by the @author tags.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package io.undertow.servlet.test.streams;
20+
21+
import io.undertow.testutils.DefaultServer;
22+
import io.undertow.testutils.HttpClientUtils;
23+
import io.undertow.testutils.TestHttpClient;
24+
import io.undertow.util.StatusCodes;
25+
import org.apache.commons.codec.binary.Hex;
26+
import org.apache.http.HttpResponse;
27+
import org.apache.http.client.methods.CloseableHttpResponse;
28+
import org.apache.http.client.methods.HttpPost;
29+
import org.apache.http.entity.InputStreamEntity;
30+
import org.apache.http.entity.StringEntity;
31+
import org.apache.http.impl.client.CloseableHttpClient;
32+
import org.apache.http.impl.client.HttpClients;
33+
import org.junit.Assert;
34+
import org.junit.Test;
35+
36+
import java.io.ByteArrayInputStream;
37+
import java.io.ByteArrayOutputStream;
38+
import java.io.IOException;
39+
import java.io.InputStream;
40+
import java.io.InterruptedIOException;
41+
import java.io.OutputStream;
42+
import java.net.HttpURLConnection;
43+
import java.net.URL;
44+
import java.util.concurrent.ExecutorService;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicBoolean;
48+
49+
/**
50+
* @author Stuart Douglas
51+
*/
52+
public abstract class AbstractServletInputStreamTestCase {
53+
54+
public static final String HELLO_WORLD = "Hello World";
55+
public static final String BLOCKING_SERVLET = "blockingInput";
56+
public static final String ASYNC_SERVLET = "asyncInput";
57+
58+
@Test
59+
public void testBlockingServletInputStream() {
60+
StringBuilder builder = new StringBuilder(1000 * HELLO_WORLD.length());
61+
for (int i = 0; i < 10; ++i) {
62+
try {
63+
for (int j = 0; j < 1000; ++j) {
64+
builder.append(HELLO_WORLD);
65+
}
66+
String message = builder.toString();
67+
runTest(message, BLOCKING_SERVLET, false, false);
68+
} catch (Throwable e) {
69+
throw new RuntimeException("test failed with i equal to " + i, e);
70+
}
71+
}
72+
}
73+
74+
@Test
75+
public void testAsyncServletInputStream() {
76+
//for(int h = 0; h < 20 ; ++h) {
77+
StringBuilder builder = new StringBuilder(1000 * HELLO_WORLD.length());
78+
for (int i = 0; i < 10; ++i) {
79+
try {
80+
for (int j = 0; j < 10000; ++j) {
81+
builder.append(HELLO_WORLD);
82+
}
83+
String message = builder.toString();
84+
runTest(message, ASYNC_SERVLET, false, false);
85+
} catch (Throwable e) {
86+
throw new RuntimeException("test failed with i equal to " + i, e);
87+
}
88+
}
89+
//}
90+
}
91+
92+
@Test
93+
public void testAsyncServletInputStreamWithPreamble() {
94+
StringBuilder builder = new StringBuilder(2000 * HELLO_WORLD.length());
95+
for (int i = 0; i < 10; ++i) {
96+
try {
97+
for (int j = 0; j < 10000; ++j) {
98+
builder.append(HELLO_WORLD);
99+
}
100+
String message = builder.toString();
101+
runTest(message, ASYNC_SERVLET, true, false);
102+
} catch (Throwable e) {
103+
throw new RuntimeException("test failed with i equal to " + i, e);
104+
}
105+
}
106+
}
107+
108+
@Test
109+
public void testAsyncServletInputStreamInParallel() throws Exception {
110+
StringBuilder builder = new StringBuilder(100000 * HELLO_WORLD.length());
111+
for (int j = 0; j < 100000; ++j) {
112+
builder.append(HELLO_WORLD);
113+
}
114+
String message = builder.toString();
115+
runTestParallel(100, message, ASYNC_SERVLET, false, false);
116+
}
117+
118+
@Test
119+
public void testAsyncServletInputStreamInParallelOffIoThread() throws Exception {
120+
StringBuilder builder = new StringBuilder(100000 * HELLO_WORLD.length());
121+
for (int j = 0; j < 100000; ++j) {
122+
builder.append(HELLO_WORLD);
123+
}
124+
String message = builder.toString();
125+
runTestParallel(100, message, ASYNC_SERVLET, false, true);
126+
}
127+
128+
@Test
129+
public void testAsyncServletInputStreamOffIoThread() {
130+
StringBuilder builder = new StringBuilder(2000 * HELLO_WORLD.length());
131+
for (int i = 0; i < 10; ++i) {
132+
try {
133+
for (int j = 0; j < 10000; ++j) {
134+
builder.append(HELLO_WORLD);
135+
}
136+
String message = builder.toString();
137+
runTest(message, ASYNC_SERVLET, false, true);
138+
} catch (Throwable e) {
139+
throw new RuntimeException("test failed with i equal to " + i, e);
140+
}
141+
}
142+
}
143+
144+
@Test
145+
public void testAsyncServletInputStreamOffIoThreadWithPreamble() {
146+
StringBuilder builder = new StringBuilder(2000 * HELLO_WORLD.length());
147+
for (int i = 0; i < 10; ++i) {
148+
try {
149+
for (int j = 0; j < 10000; ++j) {
150+
builder.append(HELLO_WORLD);
151+
}
152+
String message = builder.toString();
153+
runTest(message, ASYNC_SERVLET, true, true);
154+
} catch (Throwable e) {
155+
throw new RuntimeException("test failed with i equal to " + i, e);
156+
}
157+
}
158+
}
159+
160+
@Test
161+
public void testAsyncServletInputStreamWithEmptyRequestBody() {
162+
String message = "";
163+
try {
164+
runTest(message, ASYNC_SERVLET, false, false);
165+
} catch (Throwable e) {
166+
throw new RuntimeException("test failed", e);
167+
}
168+
}
169+
170+
private void runTestViaJavaImpl(final String message, String url)
171+
throws IOException {
172+
HttpURLConnection urlcon = null;
173+
try {
174+
String uri = getBaseUrl() + "/servletContext/" + url;
175+
urlcon = (HttpURLConnection) new URL(uri).openConnection();
176+
urlcon.setInstanceFollowRedirects(true);
177+
urlcon.setRequestProperty("Connection", "close");
178+
urlcon.setRequestMethod("POST");
179+
urlcon.setDoInput(true);
180+
urlcon.setDoOutput(true);
181+
OutputStream os = urlcon.getOutputStream();
182+
os.write(message.getBytes());
183+
os.close();
184+
Assert.assertEquals(StatusCodes.OK, urlcon.getResponseCode());
185+
InputStream is = urlcon.getInputStream();
186+
187+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
188+
byte[] buf = new byte[256];
189+
int len;
190+
while ((len = is.read(buf)) > 0 ){
191+
bytes.write(buf, 0, len);
192+
}
193+
is.close();
194+
final String response = new String(bytes.toByteArray(), 0, bytes.size());
195+
if (!message.equals(response)) {
196+
System.out.println(String.format("response=%s", Hex.encodeHexString(response.getBytes())));
197+
}
198+
Assert.assertEquals(message, response);
199+
} finally {
200+
if (urlcon != null) {
201+
urlcon.disconnect();
202+
}
203+
}
204+
}
205+
206+
protected String getBaseUrl() {
207+
return DefaultServer.getDefaultServerURL();
208+
}
209+
210+
@Test
211+
public void testAsyncServletInputStream3() {
212+
String message = "to_user_id=7999&msg_body=msg3";
213+
for (int i = 0; i < 200; ++i) {
214+
try {
215+
runTestViaJavaImpl(message, ASYNC_SERVLET);
216+
} catch (Throwable e) {
217+
System.out.println("test failed with i equal to " + i);
218+
e.printStackTrace();
219+
throw new RuntimeException("test failed with i equal to " + i, e);
220+
}
221+
}
222+
}
223+
224+
225+
public void runTest(final String message, String url, boolean preamble, boolean offIOThread) throws IOException {
226+
TestHttpClient client = createClient();
227+
try {
228+
String uri = getBaseUrl() + "/servletContext/" + url;
229+
HttpPost post = new HttpPost(uri);
230+
if (preamble && !message.isEmpty()) {
231+
post.addHeader("preamble", Integer.toString(message.length() / 2));
232+
}
233+
if (offIOThread) {
234+
post.addHeader("offIoThread", "true");
235+
}
236+
post.setEntity(new StringEntity(message));
237+
HttpResponse result = client.execute(post);
238+
Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode());
239+
final String response = HttpClientUtils.readResponse(result);
240+
Assert.assertEquals(message.length(), response.length());
241+
Assert.assertEquals(message, response);
242+
} finally {
243+
client.getConnectionManager().shutdown();
244+
}
245+
}
246+
247+
public void runTestParallel(int concurrency, final String message, String url, boolean preamble, boolean offIOThread) throws Exception {
248+
CloseableHttpClient client = HttpClients.custom()
249+
.setMaxConnPerRoute(1000)
250+
.build();
251+
byte[] messageBytes = message.getBytes();
252+
try {
253+
ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
254+
AtomicBoolean failed = new AtomicBoolean();
255+
Runnable task = new Runnable() {
256+
@Override
257+
public void run() {
258+
if (failed.get()) {
259+
return;
260+
}
261+
try {
262+
String uri = getBaseUrl() + "/servletContext/" + url;
263+
HttpPost post = new HttpPost(uri);
264+
if (preamble && !message.isEmpty()) {
265+
post.addHeader("preamble", Integer.toString(message.length() / 2));
266+
}
267+
if (offIOThread) {
268+
post.addHeader("offIoThread", "true");
269+
}
270+
post.setEntity(new InputStreamEntity(
271+
// Server should wait for events from the client
272+
new RateLimitedInputStream(new ByteArrayInputStream(messageBytes))));
273+
CloseableHttpResponse result = client.execute(post);
274+
try {
275+
Assert.assertEquals(StatusCodes.OK, result.getStatusLine().getStatusCode());
276+
final String response = HttpClientUtils.readResponse(result);
277+
Assert.assertEquals(message.length(), response.length());
278+
Assert.assertEquals(message, response);
279+
} finally {
280+
result.close();
281+
}
282+
} catch (Throwable t) {
283+
if (failed.compareAndSet(false, true)) {
284+
t.printStackTrace();
285+
executorService.shutdownNow();
286+
}
287+
}
288+
}
289+
};
290+
for (int i = 0; i < concurrency * 5; i++) {
291+
executorService.submit(task);
292+
}
293+
executorService.shutdown();
294+
Assert.assertTrue(executorService.awaitTermination(70, TimeUnit.SECONDS));
295+
Assert.assertFalse(failed.get());
296+
} finally {
297+
client.close();
298+
}
299+
}
300+
301+
private static final class RateLimitedInputStream extends InputStream {
302+
private final InputStream in;
303+
private int count;
304+
305+
RateLimitedInputStream(InputStream in) {
306+
this.in = in;
307+
}
308+
309+
@Override
310+
public int read() throws IOException {
311+
if (count++ % 1000 == 0) {
312+
try {
313+
Thread.sleep(1);
314+
} catch (InterruptedException e) {
315+
throw new InterruptedIOException();
316+
}
317+
}
318+
return in.read();
319+
}
320+
321+
@Override
322+
public void close() throws IOException {
323+
in.close();
324+
}
325+
}
326+
327+
protected TestHttpClient createClient() {
328+
return new TestHttpClient();
329+
}
330+
331+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* JBoss, Home of Professional Open Source.
3+
* Copyright 2014 Red Hat, Inc., and individual contributors
4+
* as indicated by the @author tags.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package io.undertow.servlet.test.streams;
20+
21+
import io.undertow.server.handlers.RequestBufferingHandler;
22+
import io.undertow.servlet.ServletExtension;
23+
import io.undertow.servlet.api.DeploymentInfo;
24+
import io.undertow.servlet.api.ServletInfo;
25+
import io.undertow.servlet.test.util.DeploymentUtils;
26+
import io.undertow.testutils.DefaultServer;
27+
import org.junit.BeforeClass;
28+
import org.junit.runner.RunWith;
29+
30+
import javax.servlet.ServletContext;
31+
32+
/**
33+
* @author Carter Kozak
34+
*/
35+
@RunWith(DefaultServer.class)
36+
public class ServletInputStreamRequestBufferingTestCase extends AbstractServletInputStreamTestCase {
37+
38+
@BeforeClass
39+
public static void setup() {
40+
DeploymentUtils.setupServlet(
41+
new ServletExtension() {
42+
@Override
43+
public void handleDeployment(DeploymentInfo deploymentInfo, ServletContext servletContext) {
44+
deploymentInfo.addInitialHandlerChainWrapper(new RequestBufferingHandler.Wrapper(1));
45+
}
46+
},
47+
new ServletInfo(BLOCKING_SERVLET, BlockingInputStreamServlet.class)
48+
.addMapping("/" + BLOCKING_SERVLET),
49+
new ServletInfo(ASYNC_SERVLET, AsyncInputStreamServlet.class)
50+
.addMapping("/" + ASYNC_SERVLET)
51+
.setAsyncSupported(true));
52+
}
53+
}

0 commit comments

Comments
 (0)