Skip to content

Add flow-control and remove auto-read in netty4 HTTP pipeline #126441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Apr 11, 2025

Conversation

mhl-b
Copy link
Contributor

@mhl-b mhl-b commented Apr 8, 2025

Remove buffering and queueing in Netty4HttpRequestBodyStream and Netty4HttpHeaderValidator and replace it with netty's FlowControlHandler in HTTP pipeline. Disable channel auto-read and introduce explicit channel/ctx.read().

Add a few helper classes to detect missing reads in tests MissingReadDetector and ReadSniffer.

@mhl-b mhl-b added :Distributed Coordination/Network Http and internode communication implementations Team:Distributed Coordination Meta label for Distributed Coordination team >enhancement labels Apr 8, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @mhl-b, I've created a changelog YAML for you.

@mhl-b mhl-b changed the title Add flow-control and remove auto-read in netty4 http pipeline Add flow-control and remove auto-read in netty4 HTTP pipeline Apr 8, 2025
@mhl-b mhl-b marked this pull request as ready for review April 8, 2025 08:29
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@mhl-b mhl-b requested a review from ywangd April 10, 2025 02:38
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Production code looks good to me, I have mostly just commented on the tests.

*/
public class ReadSniffer extends ChannelOutboundHandlerAdapter {

int readCnt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit, we can afford the extra vowels here

Suggested change
int readCnt;
int readCount;

private long lastRead;
private ScheduledFuture<?> checker;

MissingReadDetector(TimeProvider timer, long missingReadInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit, let's record that it's milliseconds here

Suggested change
MissingReadDetector(TimeProvider timer, long missingReadInterval) {
MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {

@@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
if (msg instanceof LastHttpContent == false) {
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
}
} else {
streamContentSizeHandler.channelRead(ctx, msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is apparently not covered by the unit test suite, could we add a test that hits it?

Copy link
Contributor Author

@mhl-b mhl-b Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.elasticsearch.http.netty4.Netty4HttpHeaderValidatorTests#testWithFlowControlAndAggregator is covering it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, next step is to remove aggregator from netty. There is no incentive to add more tests here, more to remove in coming days.

@@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
isContinueExpected = true;
} else {
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request.decoderResult().isFailure() case above is also not covered by unit tests, can we add that?

assert ctx.channel().config().isAutoRead() == false : "auto-read should be always disabled";
if (msg instanceof HttpObject httpObject) {
if (httpObject.decoderResult().isFailure()) {
ctx.fireChannelRead(httpObject); // pass-through for decoding failures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is also not covered by a unit test

serverTransport.getThreadPool().getThreadContext(),
activityTracker
);
var contentStream = new Netty4HttpRequestBodyStream(ctx, serverTransport.getThreadPool().getThreadContext());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently this is not touched by the unit tests, nor is the msg instanceof HttpContent branch below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pipelining test is covered primarily by Netty4IncrementalRequestHandlingIT

@@ -251,7 +251,7 @@ public void close() {
server.dispatcher.reqQueue.forEach(r -> r.request.getHttpRequest().release());
server.netty.stop();
server.threadPool.shutdownNow();
safeAwait(client.netty.config().group().shutdownGracefully());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fix the other shutdownGracefully() calls with default timeouts? Maybe in a follow-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should, can do follow up

private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private final ThreadContext threadContext;
private final ThreadWatchdog.ActivityTracker activityTracker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't drop the ActivityTracker here, it's super-important for finding bugs that block the transport threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry hang on I think I see, is it now the case that all activity is tracked by Netty4HttpPipeliningHandler, i.e. we're handling every single HTTP chunk within org.elasticsearch.http.netty4.Netty4HttpPipeliningHandler#channelRead?

Copy link
Contributor Author

@mhl-b mhl-b Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, before stream could serve chunk from current buffer without calling channel read. Now it is always from channel read.


// ensures that we read from channel when no current chunks available
// and pass next chunk downstream without holding
public void testReadFromChannel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we adjust this test to match the new behaviour rather than just discarding it? It seems worth keeping to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every call is channel read, pretty much every test unit and integ cover this.

import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Netty4HttpHeaderValidatorTests extends ESTestCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that we're losing so many of these tests. They seem important.

Copy link
Contributor Author

@mhl-b mhl-b Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many, most of them around buffering and internal state. Do you see anything particular is missing? I think current unit test small but robust. There are plenty of security tests in x-pack that run through this code too. The diff in github is ugly, I should probably create new test file and remove old one.

TLDR. Only missing one is decoder failure test as you pointed out already.

Here is list of previous tests and their fate:

  • testValidationPausesAndResumesData, covered by testIgnoreReadWhenValidating
  • testValidatorDoesNotTweakAutoReadAfterValidationComplete, not relevant, there is no autoread
  • testContentForwardedAfterValidation, covered by testMixedValidationResults
  • testContentDroppedAfterValidationFailure, covered by testMixedValidationResults (need to move few lines in current test)
  • testValidationErrorForwardsAsDecoderErrorMessage, covered by testMixedValidationResults
  • testValidationExceptionForwardsAsDecoderErrorMessage, a missing one as you pointed out
  • testValidationHandlesMultipleQueuedUpMessages, not relevant, there is no queue
  • testValidationFailureRecoversForEnqueued, there is no queue
  • testValidationFailureRecoversForInbound, covered by testMixedValidationResults
  • testValidationSuccessForLargeMessage
  • testValidationFailureForLargeMessage - I can extend testMixedValidationResults that writes a random size of content. But there is no content size semantic in validator, it was relevant with queueing.
  • testFullRequest... there is no need for these, HttpRequestDecoder does not emit full requests. Validator has to be placed after HttpObjectAggregator to handle those.

@mhl-b
Copy link
Contributor Author

mhl-b commented Apr 11, 2025

@DaveCTurner , addressed in 34f5884

  • naming changes
  • decoder failure unit tests
  • stream closure unit test
  • order of auto-read=off and first read in pipeline initialization

@mhl-b mhl-b requested a review from DaveCTurner April 11, 2025 00:21
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks this all LGTM then. Could you fix the top-level comment to be something more appropriate for the Git commit (it's good info, you can move it into a reply, just doesn't really need to be in Git)

Copy link
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a great PR that brings in a lot simplification. I like it! Thanks for requesting the reivew. I don't really feel qualified to review all the changes without spending significant more time on educating myself first on Netty though. David's approval should be sufficient. So I took this as a learning opportunity and read through the production code changes.

The code is definitely a lot earier to read. I only had minor comments. IIUC, the explicit read is sometimes called more often than needed, e.g. both Netty4HttpPipeliningHandler and Netty4HttpRequestBodyStream can call read when the stream ends? But that should not be a problem as long as we don't receive more data during header validation, for which there is an interception plus an assertion?

Thanks for working on this!

} else {
if (msg instanceof HttpRequest request) {
validate(ctx, request);
} else if (msg instanceof HttpContent content) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no other possible type, should it be just an else branch with assert msg instanceof HttpContent?

Copy link
Contributor Author

@mhl-b mhl-b Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not bug in spotless and we could upgrade spotless version, we could use guarded patterns in switch statement. #115750. Cant use java features because of spotless, sad.

switch (msg) {
    case HttpObject obj when obj.decoderResult().isFailure() -> ctx.fireChannelRead(obj);
    case HttpRequest request -> validate(ctx, request);
    case HttpContent content when state == State.DROPPING -> {
        content.release();
        ctx.read();
    }
    case HttpContent content when state == State.PASSING -> ctx.fireChannelRead(content);
    default -> {
        assert false;
    }
}

Comment on lines 30 to 31
private boolean droppingContent;
private boolean validatingRequest;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It feels these two booleans are mutually exclusive and hence can be combined as a single enum?

// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
validator.validate(httpRequest, ctx.channel(), listener);
if (msg instanceof HttpObject httpObject) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here, it should not be possible to see a different type other than HttpObject? If so, we could use an assertion either here or in an else branch?

Comment on lines +49 to +50
if (now >= lastRead + interval) {
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it is worthwhile to warn faster if the last seen message is not either a FullHttpRequest or LastHttpContent?

Copy link
Contributor Author

@mhl-b mhl-b Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is use-case for this? Not reading from stream at anytime sounds equally bad. Our transport code that missed read at the end of request or stream handler that forgot to read chunk or close stream, both problematic.

@mhl-b
Copy link
Contributor Author

mhl-b commented Apr 11, 2025

Thanks @DaveCTurner and @ywangd.

Netty4HttpPipeliningHandler and Netty4HttpRequestBodyStream can call read when the stream ends

Good eyes. Yes it is. I dont think it's problematic at this time, since only bulk is handling stream and can tolerate unexpected read. But I changed it to only one read. It should be refactored in my next task to move aggregator to the rest controller. And every request in netty would be streamed.

@mhl-b
Copy link
Contributor Author

mhl-b commented Apr 11, 2025

For the history, previous PR description, before making it git friendly:

This PR removes HTTP content buffering from Netty4HttpRequestBodyStream and Netty4HttpHeaderValidator and replace it with netty's FlowControlHandler that sits after HTTP decoder. The flow control is required for multiple purposes - prevent content from invalid requests pass to the downstream, provide backpressure and throttling when resources are constrained.

FlowControlHandler emits exactly one message(HttpObject: HttpRequest, HttpContent, HttpLastContent) per read. Which is desirable behaviour for us in all cases.

@Override
protected void initChannel(Channel ch) throws Exception {
  // auto-read must be disabled all the time
  ch.config().setAutoRead(false);
    ...
  // from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
  ch.pipeline().addLast(new FlowControlHandler());
  if (Assertions.ENABLED) {
    // missing reads are hard to catch, but we can detect absence of reads within interval
    long missingReadIntervalMs = 10_000;
    ch.pipeline().addLast(new MissingReadDetector(transport.threadPool, missingReadIntervalMs));
  }
  ...
   // make very first read call, since auto-read is disabled; following reads must come from the handlers
  ch.read();
}

But FlowControlHandler does not work when channel auto-read is enabled. Second change in this PR is replacement of auto-read by explicit read's from channel handlers. The auto-read feature in netty will read and push bytes as they come, not when requested. If downstream code cannot catch up with load, this "push" strategy can consume a lot of memory. Disabling auto-read changes strategy from "push" to "pull", making application code in charge when to read from the channel. Despite being obviously better for resource usage the auto-read=off requires extra care for reading from channel, missing read will leave channel in a non-operable state.

Also add a utility handler MissingReadDetector that detects if there no reads in given time interval when assertions are enabled. Absence of reads are hard to catch. Prints warnings:

[2025-04-10T00:45:49,104][WARN ][o.e.h.n.MissingReadDetector][node_t0][transport_worker][T#2] haven't read from channel for [12727ms]
[2025-04-10T00:45:59,104][WARN ][o.e.h.n.MissingReadDetector][node_t0][transport_worker][T#2] haven't read from channel for [22693ms]

This change removes a lot of code related to correct buffering and need to synchronize body-stream and header-validator on auto-read. It makes implementation pretty straightforward for both. Right now Netty4HttpHeaderValidator will intercept and ignore read calls when validation is still going.

public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {
...
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        // until validation is completed we can ignore read calls,
        // once validation is finished HttpRequest will be fired and downstream can read from there
        if (validatingRequest == false) {
            ctx.read();
        }
    }
...
}

@mhl-b mhl-b merged commit c8805b8 into elastic:main Apr 11, 2025
17 checks passed
brianseeders added a commit to brianseeders/elasticsearch that referenced this pull request Apr 17, 2025
bcully added a commit to bcully/elasticsearch that referenced this pull request Apr 17, 2025
bcully pushed a commit that referenced this pull request Apr 17, 2025
…ipeline (#127030)

* Revert "Release buffers in netty test (#126744)"

This reverts commit f9f3def.

* Revert "Add flow-control and remove auto-read in netty4 HTTP pipeline (#126441)"

This reverts commit c8805b8.
ankikuma added a commit to ankikuma/elasticsearch that referenced this pull request Apr 17, 2025
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Apr 18, 2025
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Apr 23, 2025
Re-applying elastic#126441 with the extra `FlowControlHandler` needed to ensure
one-chunk-per-read semantics - see elastic#127111 for related tests.
DaveCTurner added a commit that referenced this pull request Apr 25, 2025
Re-applying #126441 with the extra `FlowControlHandler` needed to ensure
one-chunk-per-read semantics - see #127111 for related tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Network Http and internode communication implementations >enhancement Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants