Skip to content

Commit 8050723

Browse files
authored
OkHttpServer: support maxConcurrentCallsPerConnection (Fixes #11062). (#11063)
* Add option in OkHttpServerBuilder * Add value as MAX_CONCURRENT_STREAM setting in settings frame sent by the server to the client per connection * Enforce limit by sending a RST frame with REFUSED_STREAM error
1 parent e36f099 commit 8050723

File tree

3 files changed

+54
-0
lines changed

3 files changed

+54
-0
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java

+14
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
7474
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
7575
static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE;
7676
static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE;
77+
static final int MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
7778
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);
7879

7980
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
@@ -129,6 +130,7 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
129130
long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
130131
long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
131132
long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
133+
int maxConcurrentCallsPerConnection = MAX_CONCURRENT_STREAMS;
132134

133135
OkHttpServerBuilder(
134136
SocketAddress address, HandshakerSocketFactory handshakerSocketFactory) {
@@ -350,6 +352,18 @@ public OkHttpServerBuilder maxInboundMetadataSize(int bytes) {
350352
return this;
351353
}
352354

355+
/**
356+
* The maximum number of concurrent calls permitted for each incoming connection. Defaults to no
357+
* limit.
358+
*/
359+
@CanIgnoreReturnValue
360+
public OkHttpServerBuilder maxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
361+
checkArgument(maxConcurrentCallsPerConnection > 0,
362+
"max must be positive: %s", maxConcurrentCallsPerConnection);
363+
this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
364+
return this;
365+
}
366+
353367
/**
354368
* Sets the maximum message size allowed to be received on the server. If not called, defaults to
355369
* defaults to 4 MiB. The default provides protection to servers who haven't considered the

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

+11
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
219219
OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
220220
OkHttpSettingsUtil.set(settings,
221221
OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
222+
if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
223+
OkHttpSettingsUtil.set(settings,
224+
OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
225+
}
222226
frameWriter.settings(settings);
223227
if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
224228
frameWriter.windowUpdate(
@@ -520,6 +524,7 @@ static final class Config {
520524
final long permitKeepAliveTimeInNanos;
521525
final long maxConnectionAgeInNanos;
522526
final long maxConnectionAgeGraceInNanos;
527+
final int maxConcurrentStreams;
523528

524529
public Config(
525530
OkHttpServerBuilder builder,
@@ -544,6 +549,7 @@ public Config(
544549
permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
545550
maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
546551
maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
552+
maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
547553
}
548554
}
549555

@@ -638,6 +644,11 @@ public void headers(boolean outFinished,
638644
newStream = streamId > lastStreamId;
639645
if (newStream) {
640646
lastStreamId = streamId;
647+
if (config.maxConcurrentStreams <= streams.size()) {
648+
streamError(streamId, ErrorCode.REFUSED_STREAM,
649+
"Max concurrent stream reached. RFC7540 section 5.1.2");
650+
return;
651+
}
641652
}
642653
}
643654

okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,35 @@ public void keepAliveEnforcer_noticesActive() throws Exception {
12641264
eq(ByteString.encodeString("too_many_pings", GrpcUtil.US_ASCII)));
12651265
}
12661266

1267+
@Test
1268+
public void maxConcurrentCallsPerConnection_failsWithRst() throws Exception {
1269+
int maxConcurrentCallsPerConnection = 1;
1270+
serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection);
1271+
initTransport();
1272+
handshake();
1273+
1274+
ArgumentCaptor<Settings> settingsCaptor = ArgumentCaptor.forClass(Settings.class);
1275+
verify(clientFramesRead).settings(eq(false), settingsCaptor.capture());
1276+
final Settings settings = settingsCaptor.getValue();
1277+
assertThat(OkHttpSettingsUtil.get(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS))
1278+
.isEqualTo(maxConcurrentCallsPerConnection);
1279+
1280+
final List<Header> headers = Arrays.asList(
1281+
HTTP_SCHEME_HEADER,
1282+
METHOD_HEADER,
1283+
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
1284+
new Header(Header.TARGET_PATH, "/com.example/SimpleService/doit"),
1285+
CONTENT_TYPE_HEADER,
1286+
TE_HEADER);
1287+
1288+
clientFrameWriter.headers(1, headers);
1289+
clientFrameWriter.headers(3, headers);
1290+
clientFrameWriter.flush();
1291+
1292+
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
1293+
verify(clientFramesRead).rstStream(3, ErrorCode.REFUSED_STREAM);
1294+
}
1295+
12671296
private void initTransport() throws Exception {
12681297
serverTransport = new OkHttpServerTransport(
12691298
new OkHttpServerTransport.Config(serverBuilder, Arrays.asList()),

0 commit comments

Comments
 (0)