Skip to content

servlet: Implement gRPC server as a Servlet #4738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a0c92df
POC: Implement gRPC server as a Servlet
dapengzhang0 Jul 31, 2018
8719377
better-logging
dapengzhang0 Aug 12, 2018
46d00ec
get rid of public ServerImpl
dapengzhang0 Aug 13, 2018
4fa6271
rename files
dapengzhang0 Aug 14, 2018
e793c7c
fix serverListener NPE
dapengzhang0 Aug 14, 2018
393fcf6
fix onSendingBytes()
dapengzhang0 Aug 15, 2018
706a3c9
favor ConcurrentLinkedQueue for library
dapengzhang0 Aug 16, 2018
5e11674
refactor ServletAdapterImpl to ServletAdapter
dapengzhang0 Aug 17, 2018
1776917
not to use CDI for simplicity
dapengzhang0 Aug 17, 2018
bb3a720
add GrpcServlet(ServletServerBuilder serverBuilder)
dapengzhang0 Oct 15, 2018
4c7cd4f
add UndertowInteropTest and UndertowAbstractTest
dapengzhang0 Nov 2, 2018
af7f6a3
fix codecov
dapengzhang0 Nov 15, 2018
2fc04fc
minor enhancements
dapengzhang0 Nov 16, 2018
6fb4fc2
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jan 2, 2019
8b78b6d
update example gradle files
dapengzhang0 Jan 3, 2019
680872b
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jan 7, 2019
b3473df
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jun 13, 2019
3728866
revert core/src/test/java/io/grpc/internal/AbstractTransportTest.java
dapengzhang0 Jun 13, 2019
58fce63
ignore some transport tests
dapengzhang0 Jun 13, 2019
5b02064
cleanup GrpcServlet constructor
dapengzhang0 Jun 13, 2019
d59f81e
better way to get authority
dapengzhang0 Jun 13, 2019
5b7496e
comment SPSC
dapengzhang0 Jul 5, 2019
e98e4f8
attributes.toBuilder()
dapengzhang0 Jul 5, 2019
6aa26e4
move writeState and WriteListener
dapengzhang0 Jul 5, 2019
c3dfaba
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Jul 8, 2019
44ea5f3
revert AbstractTransportTest reformat
dapengzhang0 Jul 8, 2019
ef5f879
temp
dapengzhang0 Jul 14, 2019
47e3e95
fix some of review comments
dapengzhang0 Jul 17, 2019
c5fc2bd
create util methods
dapengzhang0 Aug 7, 2019
63e8cb2
minor cleanup
dapengzhang0 Aug 7, 2019
986a885
Merge branch 'master' of https://github.com/grpc/grpc-java into servl…
dapengzhang0 Aug 11, 2019
3fd4ca8
factor out write path so that easy to replace with other implementation
dapengzhang0 Aug 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
better-logging
  • Loading branch information
dapengzhang0 committed Nov 1, 2018
commit 87193772ece028a9e971095d2b4c698905c2575d
2 changes: 1 addition & 1 deletion servlet/interop-testing/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ dependencies {
compile project(':grpc-servlet')

// for container that already supports CDI 2
// providedCompile group: 'javax.enterprise', name: 'cdi-api', version: '2.0.SP1'
// providedCompile group: 'javax.enterprise', name: 'cdi-api', version: '2.0.SP1'

// for container that needs CDI 2
compile group: 'javax.enterprise', name: 'cdi-api', version: '2.0.SP1'
Expand Down
87 changes: 65 additions & 22 deletions servlet/src/main/java/io/grpc/servlet/ServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import static io.grpc.servlet.ServerStream.ByteArrayWritableBuffer.FLUSH;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;

import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.AbstractServerStream;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.LogId;
import io.grpc.internal.ReadableBuffer;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.StatsTraceContext;
Expand All @@ -41,6 +45,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.IntStream;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
Expand All @@ -51,23 +56,27 @@

final class ServerStream extends AbstractServerStream {

static final Logger logger = Logger.getLogger(ServerStream.class.getName());

private final TransportState transportState =
new TransportState(Integer.MAX_VALUE, StatsTraceContext.NOOP, new TransportTracer());
final Sink sink;
final AsyncContext asyncCtx;
final AtomicReference<WriteState> writeState;
final WritableBufferChain writeChain;
final ScheduledExecutorService scheduler;
final LogId logId;

ServerStream(
WritableBufferAllocator bufferAllocator, AsyncContext asyncCtx,
AtomicReference<WriteState> writeState, WritableBufferChain writeChain,
ScheduledExecutorService scheduler) {
ScheduledExecutorService scheduler, LogId logId) {
super(bufferAllocator, StatsTraceContext.NOOP);
this.asyncCtx = asyncCtx;
this.writeState = writeState;
this.writeChain = writeChain;
this.scheduler = scheduler;
this.logId = logId;
this.sink = new Sink();
}

Expand Down Expand Up @@ -99,7 +108,7 @@ public void runOnTransportThread(Runnable r) {
@Override
public void bytesRead(int numBytes) {
// no-op
// TODO: flow control
// not able to do flow control
}

@Override
Expand Down Expand Up @@ -171,10 +180,8 @@ private static final class Entry {
boolean polled;
}

@Nonnull
Entry head;
@Nonnull
Entry tail;
Entry head; // not null
Entry tail; // not null

WritableBufferChain() {
head = new Entry();
Expand Down Expand Up @@ -262,12 +269,15 @@ final class Sink implements AbstractServerStream.Sink {

@Override
public void writeHeaders(Metadata headers) {
System.out.println("writeHeaders"); // TODO: better logging
// Discard any application supplied duplicates of the reserved headers
headers.discardAll(CONTENT_TYPE_KEY);
headers.discardAll(GrpcUtil.TE_HEADER);
headers.discardAll(GrpcUtil.USER_AGENT_KEY);

if (logger.isLoggable(FINE)) {
logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, headers});
}

resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType(CONTENT_TYPE_GRPC);

Expand All @@ -277,7 +287,6 @@ public void writeHeaders(Metadata headers) {
new String(serializedHeaders[i], StandardCharsets.US_ASCII),
new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
}
// resp.setHeader("trailer", "grpc-status"); // , grpc-message");
resp.setTrailerFields(trailerSupplier);
}

Expand All @@ -286,36 +295,53 @@ public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMes
if (frame == null && !flush) {
return;
}
if (frame != null && flush) {
writeFrame(frame, false, numMessages);
writeFrame(null, true, 0);
return;

if (logger.isLoggable(FINE)) {
logger.log(
FINE,
"[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
}
System.out.println("writeFrame flush = " + flush); // TODO: better logging

WriteState curState = writeState.get();
ByteArrayWritableBuffer byteBuffer = frame == null ? FLUSH : (ByteArrayWritableBuffer) frame;
if (frame != null) {
writeFrame((ByteArrayWritableBuffer) frame);
}

if (flush) {
writeFrame(FLUSH);
}
}

private void writeFrame(ByteArrayWritableBuffer byteBuffer) {
int numBytes = byteBuffer.readableBytes();
if (numBytes > 0) {
onSendingBytes(numBytes);
}

WriteState curState = writeState.get();
if (curState.stillWritePossible) {
try {
ServletOutputStream outputStream = resp.getOutputStream();
if (byteBuffer == FLUSH) {
resp.flushBuffer();
} else {
outputStream.write(byteBuffer.bytes, 0, byteBuffer.readableBytes());
transportState().onSentBytes(numBytes);
if (logger.isLoggable(FINEST)) {
logger.log(
FINEST,
"[{0}] outbound data: length = {1}, bytes = {2}",
new Object[]{
logId, numBytes, toHexString(byteBuffer.bytes, byteBuffer.readableBytes())});
}
}
if (!outputStream.isReady()) {
while (!writeState.compareAndSet(curState, curState.withStillWritePossible(false))) {
Thread.yield();
while (true) {
if (writeState.compareAndSet(curState, curState.withStillWritePossible(false))) {
return;
}
curState = writeState.get();
}
// TODO: better logging
System.out.println("writeFrame() - set stillWritePossible false");
}
} catch (IOException ioe) {
ioe.printStackTrace(); // TODO
Expand All @@ -325,15 +351,20 @@ public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMes
if (!writeState.compareAndSet(curState, curState.newState())) {
// state changed by another thread, need to check if stillWritePossible again
if (writeState.get().stillWritePossible && writeChain.poll() != null) {
writeFrame(frame, flush, numMessages);
writeFrame(byteBuffer);
}
}
}
}

@Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
System.out.println("writeTrailers"); // TODO: better logging
if (logger.isLoggable(FINE)) {
logger.log(
FINE,
"[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
new Object[] {logId, trailers, headersSent, status});
}
if (!headersSent) {
// Discard any application supplied duplicates of the reserved headers
trailers.discardAll(CONTENT_TYPE_KEY);
Expand Down Expand Up @@ -362,8 +393,8 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
while (true) {
WriteState curState = writeState.get();
if (curState.stillWritePossible) {
System.out.println("writeTrailers - complete"); // TODO: better logging
ServletAdapterImpl.asyncContextComplete(asyncCtx, scheduler);
logger.log(FINE, "[{0}] writeTrailers: call complete", logId);
return;
}
if (writeState.compareAndSet(curState, curState.withTrailersSent(true))) {
Expand Down Expand Up @@ -395,4 +426,16 @@ public Map<String, String> get() {
return trailers;
}
}

static String toHexString(byte[] bytes, int length) {
String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
if (length > 80) {
hex += "...";
}
if (length > 64) {
int offset = max(64, length - 16);
hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
}
return hex;
}
}
53 changes: 36 additions & 17 deletions servlet/src/main/java/io/grpc/servlet/ServletAdapterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.servlet.ServerStream.toHexString;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;

import io.grpc.Metadata;
import io.grpc.internal.LogId;
import io.grpc.internal.ReadableBuffers;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.WritableBufferAllocator;
Expand All @@ -31,6 +35,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PreDestroy;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
Expand All @@ -46,6 +52,8 @@
*/
final class ServletAdapterImpl implements ServletAdapter {

static final Logger logger = Logger.getLogger(ServerStream.class.getName());

private final ServerTransportListener transportListener;
private final ScheduledExecutorService scheduler;

Expand Down Expand Up @@ -78,12 +86,13 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) {
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation");
checkArgument(ServletAdapter.isGrpc(req), "req is not a gRPC request");

LogId logId = LogId.allocate(getClass().getName());
logger.log(FINE, "[{0}] RPC started", logId);

String method = req.getRequestURI().substring(1); // remove the leading "/"
Metadata headers = new Metadata();

System.out.println(req.getServletContext().getServerInfo()); // TODO: better logging
System.out.println("resp.bufferSize = " + resp.getBufferSize());

AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
AsyncContext asyncCtx = req.startAsync();

Expand All @@ -94,15 +103,15 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx
WritableBufferChain writeChain = new WritableBufferChain();

ServerStream stream =
new ServerStream(bufferAllocator, asyncCtx, writeState, writeChain, scheduler);
new ServerStream(bufferAllocator, asyncCtx, writeState, writeChain, scheduler, logId);
transportListener.streamCreated(stream, method, headers);
stream.transportState().onStreamAllocated();

output.setWriteListener(
new WriteListener() {
@Override
public void onWritePossible() throws IOException {
System.out.println("onWritePossible()"); // TODO: better logging
logger.log(FINE, "[{0}] onWritePossible", logId);

WriteState curState = writeState.get();
// curState.stillWritePossible should have been set to false already or right now
Expand All @@ -122,22 +131,31 @@ public void onWritePossible() throws IOException {
resp.flushBuffer();
} else {
output.write(buffer.bytes, 0, buffer.readableBytes());
stream.transportState().onSentBytes(buffer.readableBytes());

if (logger.isLoggable(Level.FINEST)) {
logger.log(
Level.FINEST,
"[{0}] outbound data: length = {1}, bytes = {2}",
new Object[]{
logId, buffer.readableBytes(),
toHexString(buffer.bytes, buffer.readableBytes())});
}
}
continue;
}

if (writeState.compareAndSet(curState, curState.withStillWritePossible(true))) {
// state has not changed since. It's possible a new entry is just enqueued into the
// writeChain, but this case is handled right after the enqueuing
// TODO: better logging
System.out.println("onWritePossible() - set stillWritePossible true");
break;
} // else state changed by another thread, need to drain the writeChain again
}

if (isReady && writeState.get().trailersSent) {
System.out.println("onWritePossible - complete");
asyncContextComplete(asyncCtx, scheduler);

logger.log(FINE, "[{0}] onWritePossible: call complete", logId);
}
}

Expand All @@ -152,20 +170,24 @@ public void onError(Throwable t) {
input.setReadListener(
new ReadListener() {
volatile boolean allDataRead;
volatile boolean readEos;
final byte[] buffer = new byte[4 * 1024];

@Override
public void onDataAvailable() throws IOException {
System.out.println("onDataAvailable"); // TODO: better logging
logger.log(FINE, "[{0}] onDataAvailable", logId);
while (input.isReady()) {
int length = input.read(buffer);
System.out.println("onDataAvailable: length = " + length);
if (length == -1) {
readEos = true;
System.out.println("onDataAvailable: finished = " + input.isFinished());
logger.log(FINEST, "[{0}] inbound data: read end of stream", logId);
return;
} else {
if (logger.isLoggable(FINEST)) {
logger.log(
FINEST,
"[{0}] inbound data: length = {1}, bytes = {2}",
new Object[]{logId, length, toHexString(buffer, length)});
}

stream
.transportState()
.inboundDataReceived(
Expand All @@ -177,10 +199,9 @@ public void onDataAvailable() throws IOException {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void onAllDataRead() {
System.out.println("onAllDataRead"); // TODO: better logging
logger.log(FINE, "[{0}] onAllDataRead", logId);
if (input.isFinished() && !allDataRead) {
allDataRead = true;
System.out.println("onAllDataRead finished"); // TODO: better logging
ServletContext servletContext = asyncCtx.getRequest().getServletContext();
if (servletContext != null
&& servletContext.getServerInfo().contains("GlassFish Server")
Expand All @@ -196,11 +217,9 @@ public void onAllDataRead() {
1,
TimeUnit.MILLISECONDS);
} else {
System.out.println("read EOS: " + readEos);
stream
.transportState()
.inboundDataReceived(ReadableBuffers.wrap(new byte[] {}), true);
System.out.println("onAllDataRead - endOfStream received"); // TODO: better logging
}
}
}
Expand Down