-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Add flow-control and remove auto-read in netty4 HTTP pipeline #126441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @mhl-b, I've created a changelog YAML for you. |
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Production code looks good to me, I have mostly just commented on the tests.
*/ | ||
public class ReadSniffer extends ChannelOutboundHandlerAdapter { | ||
|
||
int readCnt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nit, we can afford the extra vowels here
int readCnt; | |
int readCount; |
private long lastRead; | ||
private ScheduledFuture<?> checker; | ||
|
||
MissingReadDetector(TimeProvider timer, long missingReadInterval) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nit, let's record that it's milliseconds here
MissingReadDetector(TimeProvider timer, long missingReadInterval) { | |
MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) { |
@@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception | |||
} | |||
if (aggregating || msg instanceof FullHttpRequest) { | |||
super.channelRead(ctx, msg); | |||
if (msg instanceof LastHttpContent == false) { | |||
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf | |||
} | |||
} else { | |||
streamContentSizeHandler.channelRead(ctx, msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This branch is apparently not covered by the unit test suite, could we add a test that hits it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.elasticsearch.http.netty4.Netty4HttpHeaderValidatorTests#testWithFlowControlAndAggregator
is covering it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, next step is to remove aggregator from netty. There is no incentive to add more tests here, more to remove in coming days.
@@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { | |||
isContinueExpected = true; | |||
} else { | |||
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); | |||
ctx.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The request.decoderResult().isFailure()
case above is also not covered by unit tests, can we add that?
assert ctx.channel().config().isAutoRead() == false : "auto-read should be always disabled"; | ||
if (msg instanceof HttpObject httpObject) { | ||
if (httpObject.decoderResult().isFailure()) { | ||
ctx.fireChannelRead(httpObject); // pass-through for decoding failures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is also not covered by a unit test
serverTransport.getThreadPool().getThreadContext(), | ||
activityTracker | ||
); | ||
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently this is not touched by the unit tests, nor is the msg instanceof HttpContent
branch below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the pipelining test is covered primarily by Netty4IncrementalRequestHandlingIT
@@ -251,7 +251,7 @@ public void close() { | |||
server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release()); | |||
server.netty.stop(); | |||
server.threadPool.shutdownNow(); | |||
safeAwait(client.netty.config().group().shutdownGracefully()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we fix the other shutdownGracefully()
calls with default timeouts? Maybe in a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should, can do follow up
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4); | ||
private final ThreadContext threadContext; | ||
private final ThreadWatchdog.ActivityTracker activityTracker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't drop the ActivityTracker
here, it's super-important for finding bugs that block the transport threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry hang on I think I see, is it now the case that all activity is tracked by Netty4HttpPipeliningHandler
, i.e. we're handling every single HTTP chunk within org.elasticsearch.http.netty4.Netty4HttpPipeliningHandler#channelRead
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, before stream could serve chunk from current buffer without calling channel read. Now it is always from channel read.
|
||
// ensures that we read from channel when no current chunks available | ||
// and pass next chunk downstream without holding | ||
public void testReadFromChannel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we adjust this test to match the new behaviour rather than just discarding it? It seems worth keeping to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every call is channel read, pretty much every test unit and integ cover this.
import static org.hamcrest.Matchers.nullValue; | ||
import static org.hamcrest.Matchers.sameInstance; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
||
public class Netty4HttpHeaderValidatorTests extends ESTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned that we're losing so many of these tests. They seem important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many, most of them around buffering and internal state. Do you see anything particular is missing? I think current unit test small but robust. There are plenty of security tests in x-pack that run through this code too. The diff in github is ugly, I should probably create new test file and remove old one.
TLDR. Only missing one is decoder failure test as you pointed out already.
Here is list of previous tests and their fate:
testValidationPausesAndResumesData
, covered bytestIgnoreReadWhenValidating
testValidatorDoesNotTweakAutoReadAfterValidationComplete
, not relevant, there is no autoreadtestContentForwardedAfterValidation
, covered bytestMixedValidationResults
testContentDroppedAfterValidationFailure
, covered bytestMixedValidationResults
(need to move few lines in current test)testValidationErrorForwardsAsDecoderErrorMessage
, covered bytestMixedValidationResults
testValidationExceptionForwardsAsDecoderErrorMessage
, a missing one as you pointed outtestValidationHandlesMultipleQueuedUpMessages
, not relevant, there is no queuetestValidationFailureRecoversForEnqueued
, there is no queuetestValidationFailureRecoversForInbound
, covered bytestMixedValidationResults
testValidationSuccessForLargeMessage
testValidationFailureForLargeMessage
- I can extendtestMixedValidationResults
that writes a random size of content. But there is no content size semantic in validator, it was relevant with queueing.testFullRequest...
there is no need for these,HttpRequestDecoder
does not emit full requests. Validator has to be placed afterHttpObjectAggregator
to handle those.
@DaveCTurner , addressed in 34f5884
|
…search into netty-flow-control-handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok thanks this all LGTM then. Could you fix the top-level comment to be something more appropriate for the Git commit (it's good info, you can move it into a reply, just doesn't really need to be in Git)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a great PR that brings in a lot simplification. I like it! Thanks for requesting the reivew. I don't really feel qualified to review all the changes without spending significant more time on educating myself first on Netty though. David's approval should be sufficient. So I took this as a learning opportunity and read through the production code changes.
The code is definitely a lot earier to read. I only had minor comments. IIUC, the explicit read
is sometimes called more often than needed, e.g. both Netty4HttpPipeliningHandler
and Netty4HttpRequestBodyStream
can call read
when the stream ends? But that should not be a problem as long as we don't receive more data during header validation, for which there is an interception plus an assertion?
Thanks for working on this!
} else { | ||
if (msg instanceof HttpRequest request) { | ||
validate(ctx, request); | ||
} else if (msg instanceof HttpContent content) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no other possible type, should it be just an else
branch with assert msg instanceof HttpContent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not bug in spotless and we could upgrade spotless version, we could use guarded patterns in switch statement. #115750. Cant use java features because of spotless, sad.
switch (msg) {
case HttpObject obj when obj.decoderResult().isFailure() -> ctx.fireChannelRead(obj);
case HttpRequest request -> validate(ctx, request);
case HttpContent content when state == State.DROPPING -> {
content.release();
ctx.read();
}
case HttpContent content when state == State.PASSING -> ctx.fireChannelRead(content);
default -> {
assert false;
}
}
private boolean droppingContent; | ||
private boolean validatingRequest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It feels these two booleans are mutually exclusive and hence can be combined as a single enum?
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused | ||
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { | ||
validator.validate(httpRequest, ctx.channel(), listener); | ||
if (msg instanceof HttpObject httpObject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here, it should not be possible to see a different type other than HttpObject? If so, we could use an assertion either here or in an else
branch?
if (now >= lastRead + interval) { | ||
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it is worthwhile to warn faster if the last seen message is not either a FullHttpRequest
or LastHttpContent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is use-case for this? Not reading from stream at anytime sounds equally bad. Our transport code that missed read at the end of request or stream handler that forgot to read chunk or close stream, both problematic.
Thanks @DaveCTurner and @ywangd.
Good eyes. Yes it is. I dont think it's problematic at this time, since only bulk is handling stream and can tolerate unexpected read. But I changed it to only one read. It should be refactored in my next task to move aggregator to the rest controller. And every request in netty would be streamed. |
For the history, previous PR description, before making it git friendly: This PR removes HTTP content buffering from Netty4HttpRequestBodyStream and Netty4HttpHeaderValidator and replace it with netty's FlowControlHandler that sits after HTTP decoder. The flow control is required for multiple purposes - prevent content from invalid requests pass to the downstream, provide backpressure and throttling when resources are constrained. FlowControlHandler emits exactly one message(HttpObject: HttpRequest, HttpContent, HttpLastContent) per read. Which is desirable behaviour for us in all cases. @Override
protected void initChannel(Channel ch) throws Exception {
// auto-read must be disabled all the time
ch.config().setAutoRead(false);
...
// from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
ch.pipeline().addLast(new FlowControlHandler());
if (Assertions.ENABLED) {
// missing reads are hard to catch, but we can detect absence of reads within interval
long missingReadIntervalMs = 10_000;
ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
}
...
// make very first read call, since auto-read is disabled; following reads must come from the handlers
ch.read();
} But FlowControlHandler does not work when channel auto-read is enabled. Second change in this PR is replacement of auto-read by explicit read's from channel handlers. The auto-read feature in netty will read and push bytes as they come, not when requested. If downstream code cannot catch up with load, this "push" strategy can consume a lot of memory. Disabling auto-read changes strategy from "push" to "pull", making application code in charge when to read from the channel. Despite being obviously better for resource usage the auto-read=off requires extra care for reading from channel, missing read will leave channel in a non-operable state. Also add a utility handler MissingReadDetector that detects if there no reads in given time interval when assertions are enabled. Absence of reads are hard to catch. Prints warnings:
This change removes a lot of code related to correct buffering and need to synchronize body-stream and header-validator on auto-read. It makes implementation pretty straightforward for both. Right now Netty4HttpHeaderValidator will intercept and ignore read calls when validation is still going. public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
...
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
// until validation is completed we can ignore read calls,
// once validation is finished HttpRequest will be fired and downstream can read from there
if (validatingRequest == false) {
ctx.read();
}
}
...
} |
…elastic#126441)" This reverts commit c8805b8.
…elastic#126441)" This reverts commit c8805b8.
…elastic#126441)" This reverts commit c8805b8. Commit
…ty4 HTTP pipeline This reverts commit 2a243d8.
Re-applying elastic#126441 with the extra `FlowControlHandler` needed to ensure one-chunk-per-read semantics - see elastic#127111 for related tests.
Remove buffering and queueing in
Netty4HttpRequestBodyStream
andNetty4HttpHeaderValidator
and replace it with netty'sFlowControlHandler
in HTTP pipeline. Disable channelauto-read
and introduce explicitchannel
/ctx.read()
.Add a few helper classes to detect missing reads in tests
MissingReadDetector
andReadSniffer
.