Skip to content

Commit d6748f9

Browse files
authored
Group and flush add-responses after journal sync (apache#3837)
### Motivation Note: this is stacked on top of apache#3830 & apache#3835 This change improves the way the AddRequests responses are send to client. The current flow is: * The journal-force-thread issues the fsync on the journal file * We iterate over all the entries that were just synced and for each of them: 1. Trigger channel.writeAndFlus() 2. This will jump on the connection IO thread (Netty will use a `write()` to `eventfd` to post the task and wake the epoll) 3. Write the object in the connection and trigger the serialization logic 4. Grab a `ByteBuf` from the pool and write ~20 bytes with the response 5. Write and flush the buffer on the channel 6. With the flush consolidator we try to group multiple buffer into a single `writev()` syscall, though each call will have a long list of buffer, making the memcpy inefficient. 7. Release all the buffers and return them to the pool All these steps are quite expensive when the bookie is receiving a lot of small requests. This PR changes the flow into: 1. journal fsync 2. go through each request and prepare the response into a per-connection `ByteBuf` which is not written on the channel as of yet 3. after preparing all the responses, we flush them at once: Trigger an event on all the connections that will write the accumulated buffers. The advantages are: 1. 1 ByteBuf allocated per connection instead of 1 per request 1. Less allocations and stress of buffer pool 2. More efficient socket write() operations 3. 1 task per connection posted on the Netty IO threads, instead of 1 per request.
1 parent 08c3138 commit d6748f9

File tree

11 files changed

+126
-34
lines changed

11 files changed

+126
-34
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.PrimitiveIterator;
2424
import java.util.concurrent.CompletableFuture;
2525
import org.apache.bookkeeper.common.util.Watcher;
26+
import org.apache.bookkeeper.processor.RequestProcessor;
2627
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
2728

2829
/**
@@ -86,6 +87,8 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
8687
// TODO: Should be constructed and passed in as a parameter
8788
LedgerStorage getLedgerStorage();
8889

90+
void setRequestProcessor(RequestProcessor requestProcessor);
91+
8992
// TODO: Move this exceptions somewhere else
9093
/**
9194
* Exception is thrown when no such a ledger is found in this bookie.

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.apache.bookkeeper.net.BookieId;
7070
import org.apache.bookkeeper.net.BookieSocketAddress;
7171
import org.apache.bookkeeper.net.DNS;
72+
import org.apache.bookkeeper.processor.RequestProcessor;
7273
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
7374
import org.apache.bookkeeper.stats.NullStatsLogger;
7475
import org.apache.bookkeeper.stats.StatsLogger;
@@ -1281,4 +1282,11 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
12811282
}
12821283
}
12831284
}
1285+
1286+
@Override
1287+
public void setRequestProcessor(RequestProcessor requestProcessor) {
1288+
for (Journal journal : journals) {
1289+
journal.setRequestProcessor(requestProcessor);
1290+
}
1291+
}
12841292
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.bookkeeper.common.util.MemoryLimitController;
5252
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
5353
import org.apache.bookkeeper.conf.ServerConfiguration;
54+
import org.apache.bookkeeper.processor.RequestProcessor;
5455
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
5556
import org.apache.bookkeeper.stats.Counter;
5657
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -444,6 +445,8 @@ private class ForceWriteThread extends BookieCriticalThread {
444445
// This holds the queue entries that should be notified after a
445446
// successful force write
446447
Thread threadToNotifyOnEx;
448+
449+
RequestProcessor requestProcessor;
447450
// should we group force writes
448451
private final boolean enableGroupForceWrites;
449452
private final Counter forceWriteThreadTime;
@@ -499,6 +502,10 @@ public void run() {
499502
journalStats.getForceWriteGroupingCountStats()
500503
.registerSuccessfulValue(numReqInLastForceWrite);
501504

505+
if (requestProcessor != null) {
506+
requestProcessor.flushPendingResponses();
507+
}
508+
502509
} catch (IOException ioe) {
503510
LOG.error("I/O exception in ForceWrite thread", ioe);
504511
running = false;
@@ -1093,6 +1100,10 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
10931100
numEntriesToFlush--;
10941101
entry.run();
10951102
}
1103+
1104+
if (forceWriteThread.requestProcessor != null) {
1105+
forceWriteThread.requestProcessor.flushPendingResponses();
1106+
}
10961107
}
10971108

10981109
lastFlushPosition = bc.position();
@@ -1211,6 +1222,10 @@ public BufferedChannelBuilder getBufferedChannelBuilder() {
12111222
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
12121223
}
12131224

1225+
public void setRequestProcessor(RequestProcessor requestProcessor) {
1226+
forceWriteThread.requestProcessor = requestProcessor;
1227+
}
1228+
12141229
/**
12151230
* Shuts down the journal.
12161231
*/

bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,9 @@ public interface RequestProcessor extends AutoCloseable {
4242
* channel received the given request <i>r</i>
4343
*/
4444
void processRequest(Object r, BookieRequestHandler channel);
45+
46+
/**
47+
* Flush any pending response staged on all the client connections.
48+
*/
49+
void flushPendingResponses();
4550
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,14 @@ public Object decode(ByteBuf buffer)
334334
throw new IllegalStateException("Received unknown response : op code = " + opCode);
335335
}
336336
}
337+
338+
public static void serializeAddResponseInto(int rc, BookieProtocol.ParsedAddRequest req, ByteBuf buf) {
339+
buf.writeInt(RESPONSE_HEADERS_SIZE); // Frame size
340+
buf.writeInt(PacketHeader.toInt(req.getProtocolVersion(), req.getOpCode(), (short) 0));
341+
buf.writeInt(rc); // rc-code
342+
buf.writeLong(req.getLedgerId());
343+
buf.writeLong(req.getEntryId());
344+
}
337345
}
338346

339347
/**
@@ -504,7 +512,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
504512
if (LOG.isTraceEnabled()) {
505513
LOG.trace("Encode response {} to channel {}.", msg, ctx.channel());
506514
}
507-
if (msg instanceof BookkeeperProtocol.Response) {
515+
516+
if (msg instanceof ByteBuf) {
517+
ctx.write(msg, promise);
518+
} else if (msg instanceof BookkeeperProtocol.Response) {
508519
ctx.write(repV3.encode(msg, ctx.alloc()), promise);
509520
} else if (msg instanceof BookieProtocol.Response) {
510521
ctx.write(repPreV3.encode(msg, ctx.alloc()), promise);

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,31 @@
2020
*/
2121
package org.apache.bookkeeper.proto;
2222

23+
import io.netty.buffer.ByteBuf;
2324
import io.netty.channel.ChannelHandlerContext;
2425
import io.netty.channel.ChannelInboundHandlerAdapter;
2526
import io.netty.channel.group.ChannelGroup;
2627
import java.nio.channels.ClosedChannelException;
28+
import lombok.extern.slf4j.Slf4j;
2729
import org.apache.bookkeeper.conf.ServerConfiguration;
2830
import org.apache.bookkeeper.processor.RequestProcessor;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3131

3232
/**
3333
* Serverside handler for bookkeeper requests.
3434
*/
35+
@Slf4j
3536
public class BookieRequestHandler extends ChannelInboundHandlerAdapter {
3637

37-
private static final Logger LOG = LoggerFactory.getLogger(BookieRequestHandler.class);
38+
static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object();
39+
3840
private final RequestProcessor requestProcessor;
3941
private final ChannelGroup allChannels;
4042

4143
private ChannelHandlerContext ctx;
4244

45+
private ByteBuf pendingSendResponses = null;
46+
private int maxPendingResponsesSize;
47+
4348
BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
4449
this.requestProcessor = processor;
4550
this.allChannels = allChannels;
@@ -51,7 +56,7 @@ public ChannelHandlerContext ctx() {
5156

5257
@Override
5358
public void channelActive(ChannelHandlerContext ctx) throws Exception {
54-
LOG.info("Channel connected {}", ctx.channel());
59+
log.info("Channel connected {}", ctx.channel());
5560
this.ctx = ctx;
5661
super.channelActive(ctx);
5762
}
@@ -63,16 +68,16 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
6368

6469
@Override
6570
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
66-
LOG.info("Channels disconnected: {}", ctx.channel());
71+
log.info("Channels disconnected: {}", ctx.channel());
6772
}
6873

6974
@Override
7075
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
7176
if (cause instanceof ClosedChannelException) {
72-
LOG.info("Client died before request could be completed on {}", ctx.channel(), cause);
77+
log.info("Client died before request could be completed on {}", ctx.channel(), cause);
7378
return;
7479
}
75-
LOG.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
80+
log.error("Unhandled exception occurred in I/O thread or handler on {}", ctx.channel(), cause);
7681
ctx.close();
7782
}
7883

@@ -84,4 +89,34 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
8489
}
8590
requestProcessor.processRequest(msg, this);
8691
}
92+
93+
public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
94+
if (pendingSendResponses == null) {
95+
pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0
96+
? maxPendingResponsesSize : 256);
97+
}
98+
99+
BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses);
100+
}
101+
102+
@Override
103+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
104+
if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) {
105+
synchronized (this) {
106+
if (pendingSendResponses != null) {
107+
maxPendingResponsesSize = Math.max(maxPendingResponsesSize,
108+
pendingSendResponses.readableBytes());
109+
if (ctx.channel().isActive()) {
110+
ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise());
111+
} else {
112+
pendingSendResponses.release();
113+
}
114+
115+
pendingSendResponses = null;
116+
}
117+
}
118+
} else {
119+
super.userEventTriggered(ctx, evt);
120+
}
121+
}
87122
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.protobuf.ByteString;
3131
import io.netty.buffer.ByteBufAllocator;
3232
import io.netty.channel.Channel;
33+
import io.netty.channel.group.ChannelGroup;
3334
import io.netty.handler.ssl.SslHandler;
3435
import io.netty.util.HashedWheelTimer;
3536
import io.netty.util.concurrent.Future;
@@ -118,6 +119,8 @@ public class BookieRequestProcessor implements RequestProcessor {
118119
final Semaphore addsSemaphore;
119120
final Semaphore readsSemaphore;
120121

122+
final ChannelGroup allChannels;
123+
121124
// to temporary blacklist channels
122125
final Optional<Cache<Channel, Boolean>> blacklistedChannels;
123126
final Consumer<Channel> onResponseTimeout;
@@ -127,9 +130,11 @@ public class BookieRequestProcessor implements RequestProcessor {
127130
private final boolean throttleReadResponses;
128131

129132
public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
130-
SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
133+
SecurityHandlerFactory shFactory, ByteBufAllocator allocator,
134+
ChannelGroup allChannels) throws SecurityException {
131135
this.serverCfg = serverCfg;
132136
this.allocator = allocator;
137+
this.allChannels = allChannels;
133138
this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
134139
this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
135140
this.bookie = bookie;
@@ -694,6 +699,13 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie
694699
}
695700
}
696701

702+
@Override
703+
public void flushPendingResponses() {
704+
for (Channel c : allChannels) {
705+
c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES);
706+
}
707+
}
708+
697709
public long getWaitTimeoutOnBackpressureMillis() {
698710
return waitTimeoutOnBackpressureMillis;
699711
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,11 @@ public BookieServer(ServerConfiguration conf,
102102

103103
shFactory = SecurityProviderFactoryFactory
104104
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
105+
105106
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
106-
statsLogger.scope(SERVER_SCOPE), shFactory, allocator);
107+
statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels);
107108
this.nettyServer.setRequestProcessor(this.requestProcessor);
109+
this.bookie.setRequestProcessor(this.requestProcessor);
108110
}
109111

110112
/**

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ public void writeComplete(int rc, long ledgerId, long entryId,
122122
requestProcessor.getRequestStats().getAddEntryStats()
123123
.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
124124
}
125-
sendWriteReqResponse(rc,
126-
ResponseBuilder.buildAddResponse(request),
127-
requestProcessor.getRequestStats().getAddRequestStats());
125+
126+
requestHandler.prepareSendResponseV2(rc, request);
127+
requestProcessor.onAddRequestFinish();
128+
128129
request.recycle();
129130
recycle();
130131
}

bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import io.netty.buffer.UnpooledByteBufAllocator;
3434
import io.netty.channel.Channel;
3535
import io.netty.channel.ChannelHandlerContext;
36+
import io.netty.channel.group.ChannelGroup;
37+
import io.netty.channel.group.DefaultChannelGroup;
3638
import org.apache.bookkeeper.bookie.Bookie;
3739
import org.apache.bookkeeper.conf.ServerConfiguration;
3840
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -53,20 +55,24 @@ public class TestBookieRequestProcessor {
5355

5456
final BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class);
5557

58+
private final ChannelGroup channelGroup = new DefaultChannelGroup(null);
59+
5660
@Test
5761
public void testConstructLongPollThreads() throws Exception {
5862
// long poll threads == read threads
5963
ServerConfiguration conf = new ServerConfiguration();
6064
try (BookieRequestProcessor processor = new BookieRequestProcessor(
61-
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
65+
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
66+
channelGroup)) {
6267
assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
6368
}
6469

6570
// force create long poll threads if there is no read threads
6671
conf = new ServerConfiguration();
6772
conf.setNumReadWorkerThreads(0);
6873
try (BookieRequestProcessor processor = new BookieRequestProcessor(
69-
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
74+
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
75+
channelGroup)) {
7076
assertNull(processor.getReadThreadPool());
7177
assertNotNull(processor.getLongPollThreadPool());
7278
}
@@ -76,7 +82,8 @@ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocat
7682
conf.setNumReadWorkerThreads(2);
7783
conf.setNumLongPollWorkerThreads(2);
7884
try (BookieRequestProcessor processor = new BookieRequestProcessor(
79-
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
85+
conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT,
86+
channelGroup)) {
8087
assertNotNull(processor.getReadThreadPool());
8188
assertNotNull(processor.getLongPollThreadPool());
8289
assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());

0 commit comments

Comments
 (0)