Skip to content

Commit 1f8de8f

Browse files
authored
Improved efficiency in DigestManager.verify() (apache#3810)
1 parent f65b72d commit 1f8de8f

File tree

15 files changed

+204
-63
lines changed

15 files changed

+204
-63
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ void populateValueAndReset(ByteBuf buf) {
5252
}
5353

5454
@Override
55-
void update(ByteBuf data) {
55+
void update(ByteBuf data, int offset, int len) {
5656
MutableInt current = currentCrc.get();
5757
final int lastCrc = current.intValue();
58-
current.setValue(Crc32cIntChecksum.resumeChecksum(lastCrc, data));
58+
current.setValue(Crc32cIntChecksum.resumeChecksum(lastCrc, data, offset, len));
5959
}
6060
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class CRC32DigestManager extends DigestManager {
3333
interface CRC32Digest {
3434
long getValueAndReset();
3535

36-
void update(ByteBuf buf);
36+
void update(ByteBuf buf, int offset, int len);
3737
}
3838

3939
private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
@@ -62,7 +62,7 @@ void populateValueAndReset(ByteBuf buf) {
6262
}
6363

6464
@Override
65-
void update(ByteBuf data) {
66-
crc.get().update(data);
65+
void update(ByteBuf data, int offset, int len) {
66+
crc.get().update(data, offset, len);
6767
}
6868
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
import io.netty.buffer.ByteBuf;
2121
import io.netty.buffer.ByteBufAllocator;
22+
import io.netty.buffer.ByteBufUtil;
2223
import io.netty.buffer.CompositeByteBuf;
24+
import io.netty.buffer.PooledByteBufAllocator;
2325
import io.netty.buffer.Unpooled;
2426
import io.netty.util.ReferenceCountUtil;
27+
import io.netty.util.concurrent.FastThreadLocal;
2528
import java.security.GeneralSecurityException;
2629
import java.security.NoSuchAlgorithmException;
2730
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
@@ -51,10 +54,10 @@ public abstract class DigestManager {
5154
abstract int getMacCodeLength();
5255

5356
void update(byte[] data) {
54-
update(Unpooled.wrappedBuffer(data, 0, data.length));
57+
update(Unpooled.wrappedBuffer(data), 0, data.length);
5558
}
5659

57-
abstract void update(ByteBuf buffer);
60+
abstract void update(ByteBuf buffer, int offset, int len);
5861

5962
abstract void populateValueAndReset(ByteBuf buffer);
6063

@@ -109,7 +112,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
109112
headersBuffer.writeLong(lastAddConfirmed);
110113
headersBuffer.writeLong(length);
111114

112-
update(headersBuffer);
115+
update(headersBuffer, 0, METADATA_LENGTH);
113116

114117
// don't unwrap slices
115118
final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf
@@ -118,9 +121,9 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
118121
ReferenceCountUtil.safeRelease(data);
119122

120123
if (unwrapped instanceof CompositeByteBuf) {
121-
((CompositeByteBuf) unwrapped).forEach(this::update);
124+
((CompositeByteBuf) unwrapped).forEach(b -> update(b, b.readerIndex(), b.readableBytes()));
122125
} else {
123-
update(unwrapped);
126+
update(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
124127
}
125128
populateValueAndReset(headersBuffer);
126129

@@ -144,7 +147,7 @@ public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
144147
headersBuffer.writeLong(ledgerId);
145148
headersBuffer.writeLong(lac);
146149

147-
update(headersBuffer);
150+
update(headersBuffer, 0, LAC_METADATA_LENGTH);
148151
populateValueAndReset(headersBuffer);
149152

150153
return ByteBufList.get(headersBuffer);
@@ -158,6 +161,18 @@ private void verifyDigest(long entryId, ByteBuf dataReceived) throws BKDigestMat
158161
verifyDigest(entryId, dataReceived, false);
159162
}
160163

164+
private static final FastThreadLocal<ByteBuf> DIGEST_BUFFER = new FastThreadLocal<ByteBuf>() {
165+
@Override
166+
protected ByteBuf initialValue() throws Exception {
167+
return PooledByteBufAllocator.DEFAULT.directBuffer(1024);
168+
}
169+
170+
@Override
171+
protected void onRemoval(ByteBuf value) throws Exception {
172+
value.release();
173+
}
174+
};
175+
161176
private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryIdCheck)
162177
throws BKDigestMatchException {
163178

@@ -168,21 +183,18 @@ private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryI
168183
this.getClass().getName(), dataReceived.readableBytes());
169184
throw new BKDigestMatchException();
170185
}
171-
update(dataReceived.slice(0, METADATA_LENGTH));
186+
update(dataReceived, 0, METADATA_LENGTH);
172187

173188
int offset = METADATA_LENGTH + macCodeLength;
174-
update(dataReceived.slice(offset, dataReceived.readableBytes() - offset));
189+
update(dataReceived, offset, dataReceived.readableBytes() - offset);
175190

176-
ByteBuf digest = allocator.buffer(macCodeLength);
191+
ByteBuf digest = DIGEST_BUFFER.get();
192+
digest.clear();
177193
populateValueAndReset(digest);
178194

179-
try {
180-
if (digest.compareTo(dataReceived.slice(METADATA_LENGTH, macCodeLength)) != 0) {
181-
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
182-
throw new BKDigestMatchException();
183-
}
184-
} finally {
185-
ReferenceCountUtil.safeRelease(digest);
195+
if (!ByteBufUtil.equals(digest, 0, dataReceived, METADATA_LENGTH, macCodeLength)) {
196+
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
197+
throw new BKDigestMatchException();
186198
}
187199

188200
long actualLedgerId = dataReceived.readLong();
@@ -211,20 +223,17 @@ public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchE
211223
throw new BKDigestMatchException();
212224
}
213225

214-
update(dataReceived.slice(0, LAC_METADATA_LENGTH));
226+
update(dataReceived, 0, LAC_METADATA_LENGTH);
215227

216-
ByteBuf digest = allocator.buffer(macCodeLength);
217-
try {
218-
populateValueAndReset(digest);
228+
ByteBuf digest = DIGEST_BUFFER.get();
229+
digest.clear();
219230

220-
if (digest.compareTo(dataReceived.slice(LAC_METADATA_LENGTH, macCodeLength)) != 0) {
221-
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
222-
throw new BKDigestMatchException();
223-
}
224-
} finally {
225-
ReferenceCountUtil.safeRelease(digest);
226-
}
231+
populateValueAndReset(digest);
227232

233+
if (!ByteBufUtil.equals(digest, 0, dataReceived, LAC_METADATA_LENGTH, macCodeLength)) {
234+
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
235+
throw new BKDigestMatchException();
236+
}
228237
long actualLedgerId = dataReceived.readLong();
229238
long lac = dataReceived.readLong();
230239
if (actualLedgerId != ledgerId) {

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@ public long getValueAndReset() {
4343
}
4444

4545
@Override
46-
public void update(ByteBuf buf) {
47-
int index = buf.readerIndex();
48-
int length = buf.readableBytes();
49-
46+
public void update(ByteBuf buf, int index, int length) {
5047
try {
5148
if (buf.hasMemoryAddress()) {
5249
// Calculate CRC directly from the direct memory pointer

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ int getMacCodeLength() {
3838
}
3939

4040
@Override
41-
void update(ByteBuf buffer) {}
41+
void update(ByteBuf buffer, int offset, int len) {}
4242

4343
@Override
4444
void populateValueAndReset(ByteBuf buffer) {}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ void populateValueAndReset(ByteBuf buffer) {
9696
}
9797

9898
@Override
99-
void update(ByteBuf data) {
100-
mac.get().update(data.nioBuffer());
99+
void update(ByteBuf data, int offset, int len) {
100+
mac.get().update(data.slice(offset, len).nioBuffer());
101101
}
102102

103103

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public long getValueAndReset() {
3636
}
3737

3838
@Override
39-
public void update(ByteBuf buf) {
40-
crc.update(buf.nioBuffer());
39+
public void update(ByteBuf buf, int offset, int len) {
40+
crc.update(buf.slice(offset, len).nioBuffer());
4141
}
4242
}

circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public static int computeChecksum(ByteBuf payload) {
5353
* @param payload
5454
* @return
5555
*/
56-
public static int resumeChecksum(int previousChecksum, ByteBuf payload) {
57-
return CRC32C_HASH.resume(previousChecksum, payload);
56+
public static int resumeChecksum(int previousChecksum, ByteBuf payload, int offset, int len) {
57+
return CRC32C_HASH.resume(previousChecksum, payload, offset, len);
5858
}
5959

6060
}

circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,10 @@
2222

2323
public interface IntHash {
2424
int calculate(ByteBuf buffer);
25+
26+
int calculate(ByteBuf buffer, int offset, int len);
27+
2528
int resume(int current, ByteBuf buffer);
29+
30+
int resume(int current, ByteBuf buffer, int offset, int len);
2631
}

circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,21 @@ public int calculate(ByteBuf buffer) {
3636

3737
@Override
3838
public int resume(int current, ByteBuf buffer) {
39+
return resume(current, buffer, buffer.readerIndex(), buffer.readableBytes());
40+
}
41+
42+
@Override
43+
public int calculate(ByteBuf buffer, int offset, int len) {
44+
return resume(0, buffer, offset, len);
45+
}
46+
47+
@Override
48+
public int resume(int current, ByteBuf buffer, int offset, int len) {
3949
if (buffer.hasArray()) {
40-
return hash.resume(current, buffer.array(), buffer.arrayOffset() + buffer.readerIndex(),
41-
buffer.readableBytes());
50+
return hash.resume(current, buffer.array(), buffer.arrayOffset() + offset,
51+
len);
4252
} else {
43-
return hash.resume(current, buffer.nioBuffer());
53+
return hash.resume(current, buffer.slice(offset, len).nioBuffer());
4454
}
4555
}
4656
}

0 commit comments

Comments
 (0)