Skip to content

Commit 1e02853

Browse files
authored
Avoid thread-local state when computing CRCs (apache#3811)
### Motivation In `DigestManager` there are several accesses to ThreadLocal variable per each entry processed. The reason is the mainly due to `DigestManager` API which exposes a stateful `update()` method which can be invoked multiple times and keeps the current checksum as a thread-local variable. If we exclude MAC digest which is 20 bytes, for other digests we can instead keep the current checksum in a local variable and pass it each time, avoiding all the thread-locals and also the need for writing the checksum result into a buffer. ### Benchmarks #### Before apache#3810 ``` Benchmark (entrySize) Mode Cnt Score Error Units DigestManagerBenchmark.verifyDigest 64 thrpt 3 13.450 ± 3.634 ops/us DigestManagerBenchmark.verifyDigest 1024 thrpt 3 7.908 ± 2.637 ops/us DigestManagerBenchmark.verifyDigest 4086 thrpt 3 3.233 ± 0.882 ops/us DigestManagerBenchmark.verifyDigest 8192 thrpt 3 1.846 ± 0.047 ops/us ``` #### After apache#3810 ``` Benchmark (entrySize) Mode Cnt Score Error Units DigestManagerBenchmark.verifyDigest 64 thrpt 3 46.312 ± 7.414 ops/us DigestManagerBenchmark.verifyDigest 1024 thrpt 3 13.379 ± 1.069 ops/us DigestManagerBenchmark.verifyDigest 4086 thrpt 3 3.787 ± 0.059 ops/us DigestManagerBenchmark.verifyDigest 8192 thrpt 3 1.956 ± 0.052 ops/us ``` #### After this change ``` Benchmark (entrySize) Mode Cnt Score Error Units DigestManagerBenchmark.verifyDigest 64 thrpt 3 130.108 ± 4.854 ops/us DigestManagerBenchmark.verifyDigest 1024 thrpt 3 17.744 ± 0.238 ops/us DigestManagerBenchmark.verifyDigest 4086 thrpt 3 4.104 ± 0.181 ops/us DigestManagerBenchmark.verifyDigest 8192 thrpt 3 2.050 ± 0.066 ops/us ```
1 parent 1f8de8f commit 1e02853

File tree

6 files changed

+84
-56
lines changed

6 files changed

+84
-56
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,11 @@
2121
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
2222
import io.netty.buffer.ByteBuf;
2323
import io.netty.buffer.ByteBufAllocator;
24-
import io.netty.util.concurrent.FastThreadLocal;
2524
import lombok.extern.slf4j.Slf4j;
26-
import org.apache.commons.lang3.mutable.MutableInt;
2725

2826
@Slf4j
2927
class CRC32CDigestManager extends DigestManager {
3028

31-
private static final FastThreadLocal<MutableInt> currentCrc = new FastThreadLocal<MutableInt>() {
32-
@Override
33-
protected MutableInt initialValue() throws Exception {
34-
return new MutableInt(0);
35-
}
36-
};
37-
3829
public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
3930
super(ledgerId, useV2Protocol, allocator);
4031
}
@@ -45,16 +36,17 @@ int getMacCodeLength() {
4536
}
4637

4738
@Override
48-
void populateValueAndReset(ByteBuf buf) {
49-
MutableInt current = currentCrc.get();
50-
buf.writeInt(current.intValue());
51-
current.setValue(0);
39+
boolean isInt32Digest() {
40+
return true;
41+
}
42+
43+
@Override
44+
void populateValueAndReset(int digest, ByteBuf buf) {
45+
buf.writeInt(digest);
5246
}
5347

5448
@Override
55-
void update(ByteBuf data, int offset, int len) {
56-
MutableInt current = currentCrc.get();
57-
final int lastCrc = current.intValue();
58-
current.setValue(Crc32cIntChecksum.resumeChecksum(lastCrc, data, offset, len));
49+
int update(int digest, ByteBuf data, int offset, int len) {
50+
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
5951
}
6052
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,19 @@ int getMacCodeLength() {
5757
}
5858

5959
@Override
60-
void populateValueAndReset(ByteBuf buf) {
60+
void populateValueAndReset(int digest, ByteBuf buf) {
6161
buf.writeLong(crc.get().getValueAndReset());
6262
}
6363

6464
@Override
65-
void update(ByteBuf data, int offset, int len) {
65+
int update(int digest, ByteBuf data, int offset, int len) {
6666
crc.get().update(data, offset, len);
67+
return 0;
68+
}
69+
70+
@Override
71+
boolean isInt32Digest() {
72+
// This is stored as 8 bytes
73+
return false;
6774
}
6875
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,11 @@ public abstract class DigestManager {
5353

5454
abstract int getMacCodeLength();
5555

56-
void update(byte[] data) {
57-
update(Unpooled.wrappedBuffer(data), 0, data.length);
58-
}
56+
abstract int update(int digest, ByteBuf buffer, int offset, int len);
5957

60-
abstract void update(ByteBuf buffer, int offset, int len);
58+
abstract void populateValueAndReset(int digest, ByteBuf buffer);
6159

62-
abstract void populateValueAndReset(ByteBuf buffer);
60+
abstract boolean isInt32Digest();
6361

6462
final int macCodeLength;
6563

@@ -112,7 +110,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
112110
headersBuffer.writeLong(lastAddConfirmed);
113111
headersBuffer.writeLong(length);
114112

115-
update(headersBuffer, 0, METADATA_LENGTH);
113+
int digest = update(0, headersBuffer, 0, METADATA_LENGTH);
116114

117115
// don't unwrap slices
118116
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
@@ -121,11 +119,15 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
121119
ReferenceCountUtil.safeRelease(data);
122120

123121
if (unwrapped instanceof CompositeByteBuf) {
124-
((CompositeByteBuf) unwrapped).forEach(b -> update(b, b.readerIndex(), b.readableBytes()));
122+
CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped);
123+
for (int i = 0; i < cbb.numComponents(); i++) {
124+
ByteBuf b = cbb.component(i);
125+
digest = update(digest, b, b.readerIndex(), b.readableBytes());
126+
}
125127
} else {
126-
update(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
128+
digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
127129
}
128-
populateValueAndReset(headersBuffer);
130+
populateValueAndReset(digest, headersBuffer);
129131

130132
return ByteBufList.get(headersBuffer, unwrapped);
131133
}
@@ -147,8 +149,8 @@ public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
147149
headersBuffer.writeLong(ledgerId);
148150
headersBuffer.writeLong(lac);
149151

150-
update(headersBuffer, 0, LAC_METADATA_LENGTH);
151-
populateValueAndReset(headersBuffer);
152+
int digest = update(0, headersBuffer, 0, LAC_METADATA_LENGTH);
153+
populateValueAndReset(digest, headersBuffer);
152154

153155
return ByteBufList.get(headersBuffer);
154156
}
@@ -183,18 +185,26 @@ private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryI
183185
this.getClass().getName(), dataReceived.readableBytes());
184186
throw new BKDigestMatchException();
185187
}
186-
update(dataReceived, 0, METADATA_LENGTH);
188+
int digest = update(0, dataReceived, 0, METADATA_LENGTH);
187189

188190
int offset = METADATA_LENGTH + macCodeLength;
189-
update(dataReceived, offset, dataReceived.readableBytes() - offset);
190-
191-
ByteBuf digest = DIGEST_BUFFER.get();
192-
digest.clear();
193-
populateValueAndReset(digest);
194-
195-
if (!ByteBufUtil.equals(digest, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
196-
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
197-
throw new BKDigestMatchException();
191+
digest = update(digest, dataReceived, offset, dataReceived.readableBytes() - offset);
192+
193+
if (isInt32Digest()) {
194+
int receivedDigest = dataReceived.getInt(METADATA_LENGTH);
195+
if (receivedDigest != digest) {
196+
logger.error("Digest mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
197+
throw new BKDigestMatchException();
198+
}
199+
} else {
200+
ByteBuf digestBuf = DIGEST_BUFFER.get();
201+
digestBuf.clear();
202+
populateValueAndReset(digest, digestBuf);
203+
204+
if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
205+
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
206+
throw new BKDigestMatchException();
207+
}
198208
}
199209

200210
long actualLedgerId = dataReceived.readLong();
@@ -223,17 +233,25 @@ public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchE
223233
throw new BKDigestMatchException();
224234
}
225235

226-
update(dataReceived, 0, LAC_METADATA_LENGTH);
236+
int digest = update(0, dataReceived, 0, LAC_METADATA_LENGTH);
227237

228-
ByteBuf digest = DIGEST_BUFFER.get();
229-
digest.clear();
230-
231-
populateValueAndReset(digest);
232-
233-
if (!ByteBufUtil.equals(digest, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
234-
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
235-
throw new BKDigestMatchException();
238+
if (isInt32Digest()) {
239+
int receivedDigest = dataReceived.getInt(LAC_METADATA_LENGTH);
240+
if (receivedDigest != digest) {
241+
logger.error("Digest mismatch for ledger-id LAC: " + ledgerId);
242+
throw new BKDigestMatchException();
243+
}
244+
} else {
245+
ByteBuf digestBuf = DIGEST_BUFFER.get();
246+
digestBuf.clear();
247+
populateValueAndReset(digest, digestBuf);
248+
249+
if (!ByteBufUtil.equals(digestBuf, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
250+
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
251+
throw new BKDigestMatchException();
252+
}
236253
}
254+
237255
long actualLedgerId = dataReceived.readLong();
238256
long lac = dataReceived.readLong();
239257
if (actualLedgerId != ledgerId) {

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,15 @@ int getMacCodeLength() {
3838
}
3939

4040
@Override
41-
void update(ByteBuf buffer, int offset, int len) {}
41+
int update(int digest, ByteBuf buffer, int offset, int len) {
42+
return 0;
43+
}
4244

4345
@Override
44-
void populateValueAndReset(ByteBuf buffer) {}
46+
void populateValueAndReset(int digest, ByteBuf buffer) {}
47+
48+
@Override
49+
boolean isInt32Digest() {
50+
return false;
51+
}
4552
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,18 @@ int getMacCodeLength() {
9191

9292

9393
@Override
94-
void populateValueAndReset(ByteBuf buffer) {
94+
void populateValueAndReset(int digest, ByteBuf buffer) {
9595
buffer.writeBytes(mac.get().doFinal());
9696
}
9797

9898
@Override
99-
void update(ByteBuf data, int offset, int len) {
99+
int update(int digest, ByteBuf data, int offset, int len) {
100100
mac.get().update(data.slice(offset, len).nioBuffer());
101+
return 0;
101102
}
102103

103-
104+
@Override
105+
boolean isInt32Digest() {
106+
return false;
107+
}
104108
}

microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,9 @@ public DigestManager getDigestManager(Digest digest) {
172172
public void digestManager(MyState state) {
173173
final ByteBuf buff = state.getByteBuff(state.bufferType);
174174
final DigestManager dm = state.getDigestManager(state.digest);
175-
dm.update(buff, 0, buff.readableBytes());
175+
int digest = dm.update(0, buff, 0, buff.readableBytes());
176176
state.digestBuf.clear();
177-
dm.populateValueAndReset(state.digestBuf);
177+
dm.populateValueAndReset(digest, state.digestBuf);
178178
}
179179

180180
}

0 commit comments

Comments
 (0)