Skip to content

servlet: Implement gRPC server as a Servlet #4738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a0c92df
POC: Implement gRPC server as a Servlet
dapengzhang0 Jul 31, 2018
8719377
better-logging
dapengzhang0 Aug 12, 2018
46d00ec
get rid of public ServerImpl
dapengzhang0 Aug 13, 2018
4fa6271
rename files
dapengzhang0 Aug 14, 2018
e793c7c
fix serverListener NPE
dapengzhang0 Aug 14, 2018
393fcf6
fix onSendingBytes()
dapengzhang0 Aug 15, 2018
706a3c9
favor ConcurrentLinkedQueue for library
dapengzhang0 Aug 16, 2018
5e11674
refactor ServletAdapterImpl to ServletAdapter
dapengzhang0 Aug 17, 2018
1776917
not to use CDI for simplicity
dapengzhang0 Aug 17, 2018
bb3a720
add GrpcServlet(ServletServerBuilder serverBuilder)
dapengzhang0 Oct 15, 2018
4c7cd4f
add UndertowInteropTest and UndertowAbstractTest
dapengzhang0 Nov 2, 2018
af7f6a3
fix codecov
dapengzhang0 Nov 15, 2018
2fc04fc
minor enhancements
dapengzhang0 Nov 16, 2018
6fb4fc2
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jan 2, 2019
8b78b6d
update example gradle files
dapengzhang0 Jan 3, 2019
680872b
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jan 7, 2019
b3473df
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jun 13, 2019
3728866
revert core/src/test/java/io/grpc/internal/AbstractTransportTest.java
dapengzhang0 Jun 13, 2019
58fce63
ignore some transport tests
dapengzhang0 Jun 13, 2019
5b02064
cleanup GrpcServlet constructor
dapengzhang0 Jun 13, 2019
d59f81e
better way to get authority
dapengzhang0 Jun 13, 2019
5b7496e
comment SPSC
dapengzhang0 Jul 5, 2019
e98e4f8
attributes.toBuilder()
dapengzhang0 Jul 5, 2019
6aa26e4
move writeState and WriteListener
dapengzhang0 Jul 5, 2019
c3dfaba
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jul 8, 2019
44ea5f3
revert AbstractTransportTest reformat
dapengzhang0 Jul 8, 2019
ef5f879
temp
dapengzhang0 Jul 14, 2019
47e3e95
fix some of review comments
dapengzhang0 Jul 17, 2019
c5fc2bd
create util methods
dapengzhang0 Aug 7, 2019
63e8cb2
minor cleanup
dapengzhang0 Aug 7, 2019
986a885
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Aug 11, 2019
3fd4ca8
factor out write path so that easy to replace with other implementation
dapengzhang0 Aug 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
move writeState and WriteListener
  • Loading branch information
dapengzhang0 committed Jul 8, 2019
commit 6aa26e42427bd028cf92f7dd8b2963e880bb34a9
6 changes: 3 additions & 3 deletions examples/example-servlet/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
id 'com.google.protobuf' version '0.8.5'
id 'com.google.protobuf' version '0.8.8'
// Generate IntelliJ IDEA's .idea & .iml project files
id 'idea'
id 'war'
Expand All @@ -16,7 +16,7 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8

def grpcVersion = '1.23.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def protocVersion = '3.5.1-1'
def protocVersion = '3.7.1'

dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}",
Expand All @@ -30,7 +30,7 @@ dependencies {

protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins { grpc { artifact = "io.grpc:protoc-gen-grpc-java:1.13.1" } }
plugins { grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } }
generateProtoTasks {
all()*.plugins { grpc {} }
}
Expand Down
4 changes: 2 additions & 2 deletions servlet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ dependencies {
project(':grpc-core').sourceSets.test.output,
project(':grpc-netty').sourceSets.test.output,
libraries.junit,
'io.undertow:undertow-servlet:2.0.15.Final',
'org.apache.tomcat.embed:tomcat-embed-core:9.0.12'
'io.undertow:undertow-servlet:2.0.22.Final',
'org.apache.tomcat.embed:tomcat-embed-core:9.0.20'
}
130 changes: 3 additions & 127 deletions servlet/src/main/java/io/grpc/servlet/ServletAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static io.grpc.servlet.ServletServerStream.toHexString;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;

Expand All @@ -35,9 +34,6 @@
import io.grpc.internal.ReadableBuffers;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.WritableBufferAllocator;
import io.grpc.servlet.ServletServerStream.ByteArrayWritableBuffer;
import io.grpc.servlet.ServletServerStream.WriteState;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -47,20 +43,13 @@
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand Down Expand Up @@ -142,19 +131,10 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx
asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos));
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(streamTracerFactories, method, headers);
AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
WritableBufferAllocator bufferAllocator =
capacityHint -> new ByteArrayWritableBuffer(capacityHint);

// SPSC queue would do
Queue<ByteArrayWritableBuffer> writeChain = new ConcurrentLinkedQueue<>();

ServletServerStream stream = new ServletServerStream(
bufferAllocator,
asyncCtx,
statsTraceCtx,
writeState,
writeChain,
maxInboundMessageSize,
attributes.toBuilder()
.set(
Expand All @@ -167,12 +147,8 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx
getAuthority(req),
logId);

asyncCtx.getResponse().getOutputStream().setWriteListener(
new GrpcWriteListener(stream, asyncCtx, writeState, writeChain, logId));

transportListener.streamCreated(stream, method, headers);
stream.transportState().runOnTransportThread(
() -> stream.transportState().onStreamAllocated());
stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);

asyncCtx.getRequest().getInputStream()
.setReadListener(new GrpcReadListener(stream, asyncCtx, logId));
Expand Down Expand Up @@ -238,7 +214,7 @@ public void onTimeout(AsyncEvent event) {
logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable());
}
// If the resp is not committed, cancel() to avoid being redirected to an error page.
// Else, the container will send RST_STREAM at the end.
// Else, the container will send RST_STREAM in the end.
if (!event.getAsyncContext().getResponse().isCommitted()) {
stream.cancel(Status.DEADLINE_EXCEEDED);
} else {
Expand Down Expand Up @@ -268,106 +244,6 @@ public void onError(AsyncEvent event) {
public void onStartAsync(AsyncEvent event) {}
}

private static final class GrpcWriteListener implements WriteListener {
final ServletServerStream stream;
final AsyncContext asyncCtx;
final HttpServletResponse resp;
final ServletOutputStream output;
final AtomicReference<WriteState> writeState;
final Queue<ByteArrayWritableBuffer> writeChain;
final InternalLogId logId;

GrpcWriteListener(
ServletServerStream stream,
AsyncContext asyncCtx,
AtomicReference<WriteState> writeState,
Queue<ByteArrayWritableBuffer> writeChain,
InternalLogId logId) throws IOException {
this.stream = stream;
this.asyncCtx = asyncCtx;
resp = (HttpServletResponse) asyncCtx.getResponse();
output = resp.getOutputStream();
this.writeState = writeState;
this.writeChain = writeChain;
this.logId = logId;
}

@Override
public void onWritePossible() throws IOException {
logger.log(FINEST, "[{0}] onWritePossible: ENTRY", logId);

WriteState curState = writeState.get();
// curState.stillWritePossible should have been set to false already or right now/
// It's very very unlikely stillWritePossible is true due to a race condition
while (curState.stillWritePossible) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
curState = writeState.get();
}

boolean isReady;
while ((isReady = output.isReady())) {
curState = writeState.get();

ByteArrayWritableBuffer buffer = writeChain.poll();
if (buffer != null) {
if (buffer == ByteArrayWritableBuffer.FLUSH) {
resp.flushBuffer();
} else {
output.write(buffer.bytes, 0, buffer.readableBytes());
stream.transportState().runOnTransportThread(
() -> stream.transportState().onSentBytes(buffer.readableBytes()));

if (logger.isLoggable(Level.FINEST)) {
logger.log(
Level.FINEST,
"[{0}] outbound data: length = {1}, bytes = {2}",
new Object[] {
logId,
buffer.readableBytes(),
toHexString(buffer.bytes, buffer.readableBytes())
});
}
}
continue;
}

if (writeState.compareAndSet(curState, curState.withStillWritePossible(true))) {
logger.log(FINEST, "[{0}] set stillWritePossible to true", logId);
// state has not changed since. It's possible a new entry is just enqueued into the
// writeChain, but this case is handled right after the enqueuing
break;
} // else state changed by another thread, need to drain the writeChain again
}

if (isReady && writeState.get().trailersSent) {
stream.transportState().runOnTransportThread(
() -> {
stream.transportState().complete();
asyncCtx.complete();
});
logger.log(FINEST, "[{0}] onWritePossible: call complete", logId);
}

logger.log(FINEST, "[{0}] onWritePossible: EXIT", logId);
}

@Override
public void onError(Throwable t) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
}

// If the resp is not committed, cancel() to avoid being redirected to an error page.
// Else, the container will send RST_STREAM at the end.
if (!asyncCtx.getResponse().isCommitted()) {
stream.cancel(Status.fromThrowable(t));
} else {
stream.transportState().runOnTransportThread(
() -> stream.transportState().transportReportStatus(Status.fromThrowable(t)));
}
}
}

private static final class GrpcReadListener implements ReadListener {
final ServletServerStream stream;
final AsyncContext asyncCtx;
Expand Down Expand Up @@ -400,7 +276,7 @@ public void onDataAvailable() throws IOException {
logger.log(
FINEST,
"[{0}] inbound data: length = {1}, bytes = {2}",
new Object[] {logId, length, toHexString(buffer, length)});
new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)});
}

byte[] copy = Arrays.copyOf(buffer, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class ServletServerBuilder extends AbstractServerImplBuilder<Servle
/**
* Builds a gRPC server that can run as a servlet.
*
* <p>The returned server will not been started or be bound a port.
* <p>The returned server will not be started or bound to a port.
*
* <p>Users should not call this method directly. Instead users should call
* {@link #buildServletAdapter()} which internally will call {@code build()} and {@code start()}
Expand Down
Loading