Skip to content

OkHttpServer: support maxConcurrentCallsPerConnection (Fixes #11062). #11063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE;
static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE;
static final int MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);

private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
Expand Down Expand Up @@ -129,6 +130,7 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
int maxConcurrentCallsPerConnection = MAX_CONCURRENT_STREAMS;

OkHttpServerBuilder(
SocketAddress address, HandshakerSocketFactory handshakerSocketFactory) {
Expand Down Expand Up @@ -350,6 +352,18 @@ public OkHttpServerBuilder maxInboundMetadataSize(int bytes) {
return this;
}

/**
* The maximum number of concurrent calls permitted for each incoming connection. Defaults to no
* limit.
*/
@CanIgnoreReturnValue
public OkHttpServerBuilder maxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
checkArgument(maxConcurrentCallsPerConnection > 0,
"max must be positive: %s", maxConcurrentCallsPerConnection);
this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
return this;
}

/**
* Sets the maximum message size allowed to be received on the server. If not called, defaults to
* defaults to 4 MiB. The default provides protection to servers who haven't considered the
Expand Down
11 changes: 11 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
OkHttpSettingsUtil.set(settings,
OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
OkHttpSettingsUtil.set(settings,
OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
}
frameWriter.settings(settings);
if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
frameWriter.windowUpdate(
Expand Down Expand Up @@ -520,6 +524,7 @@ static final class Config {
final long permitKeepAliveTimeInNanos;
final long maxConnectionAgeInNanos;
final long maxConnectionAgeGraceInNanos;
final int maxConcurrentStreams;

public Config(
OkHttpServerBuilder builder,
Expand All @@ -544,6 +549,7 @@ public Config(
permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
}
}

Expand Down Expand Up @@ -638,6 +644,11 @@ public void headers(boolean outFinished,
newStream = streamId > lastStreamId;
if (newStream) {
lastStreamId = streamId;
if (config.maxConcurrentStreams <= streams.size()) {
streamError(streamId, ErrorCode.REFUSED_STREAM,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PROTOCOL_ERROR would handle implementation bugs better, because the streams wouldn't be retried. But there is a handshake race for new connections where the client may initially exceed the stream limit. REFUSED_STREAM is most helpful if the stream limit is changing over the life of the connection. To make this "really nice" we could set a flag in ackSettings() and do receivedSettingsAck ? ErrorCode.PROTOCOL_ERROR : ErrorCode.REFUSED_STREAM, because there's no valid reason for the client to exceed the stream limit after acking the settings.

But what you have is fine, and that'd be more annoying for you to test.

"Max concurrent stream reached. RFC7540 section 5.1.2");
return;
}
}
}

Expand Down
29 changes: 29 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,35 @@ public void keepAliveEnforcer_noticesActive() throws Exception {
eq(ByteString.encodeString("too_many_pings", GrpcUtil.US_ASCII)));
}

@Test
public void maxConcurrentCallsPerConnection_failsWithRst() throws Exception {
int maxConcurrentCallsPerConnection = 1;
serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection);
initTransport();
handshake();

ArgumentCaptor<Settings> settingsCaptor = ArgumentCaptor.forClass(Settings.class);
verify(clientFramesRead).settings(eq(false), settingsCaptor.capture());
final Settings settings = settingsCaptor.getValue();
assertThat(OkHttpSettingsUtil.get(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS))
.isEqualTo(maxConcurrentCallsPerConnection);

final List<Header> headers = Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService/doit"),
CONTENT_TYPE_HEADER,
TE_HEADER);

clientFrameWriter.headers(1, headers);
clientFrameWriter.headers(3, headers);
clientFrameWriter.flush();

assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).rstStream(3, ErrorCode.REFUSED_STREAM);
}

private void initTransport() throws Exception {
serverTransport = new OkHttpServerTransport(
new OkHttpServerTransport.Config(serverBuilder, Arrays.asList()),
Expand Down
Loading