Skip to content

Commit 33f40f6

Browse files
authored
[fix] [ml] Fix the incorrect total size if use ML interceptor (apache#19404)
1 parent 0a69a43 commit 33f40f6

File tree

3 files changed

+135
-2
lines changed

3 files changed

+135
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,22 @@ public void setCloseWhenDone(boolean closeWhenDone) {
125125

126126
public void initiate() {
127127
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {
128-
129128
ByteBuf duplicateBuffer = data.retainedDuplicate();
130129

131130
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
132131
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
133132
lastInitTime = System.nanoTime();
134133
if (ml.getManagedLedgerInterceptor() != null) {
134+
long originalDataLen = data.readableBytes();
135135
payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this,
136136
duplicateBuffer);
137137
if (payloadProcessorHandle != null) {
138138
duplicateBuffer = payloadProcessorHandle.getProcessedPayload();
139+
// If data len of entry changes, correct "dataLength" and "currentLedgerSize".
140+
if (originalDataLen != duplicateBuffer.readableBytes()) {
141+
this.dataLength = duplicateBuffer.readableBytes();
142+
this.ml.currentLedgerSize += (dataLength - originalDataLen);
143+
}
139144
}
140145
}
141146
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
import lombok.extern.slf4j.Slf4j;
26+
import org.apache.bookkeeper.client.LedgerHandle;
27+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
28+
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
29+
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
30+
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
31+
import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest;
32+
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
33+
import org.awaitility.Awaitility;
34+
import org.testng.annotations.Test;
35+
36+
/***
37+
* Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified
38+
* by "default".
39+
*/
40+
@Slf4j
41+
@Test(groups = "broker")
42+
public class MangedLedgerInterceptorImplTest2 extends MockedBookKeeperTestCase {
43+
44+
public static void switchLedgerManually(ManagedLedgerImpl ledger){
45+
LedgerHandle originalLedgerHandle = ledger.currentLedger;
46+
ledger.ledgerClosed(ledger.currentLedger);
47+
ledger.createLedgerAfterClosed();
48+
Awaitility.await().until(() -> {
49+
return ledger.state == ManagedLedgerImpl.State.LedgerOpened && ledger.currentLedger != originalLedgerHandle;
50+
});
51+
}
52+
53+
@Test
54+
public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception {
55+
final String mlName = "ml1";
56+
final String cursorName = "cursor1";
57+
58+
// Registry interceptor.
59+
ManagedLedgerConfig config = new ManagedLedgerConfig();
60+
Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
61+
processors.add(new TestPayloadProcessor());
62+
ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors);
63+
config.setManagedLedgerInterceptor(interceptor);
64+
config.setMaxEntriesPerLedger(100);
65+
66+
// Add one entry.
67+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config);
68+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
69+
ledger.addEntry(new byte[1]);
70+
71+
// Mark "currentLedgerSize" and switch ledger.
72+
long currentLedgerSize = ledger.getCurrentLedgerSize();
73+
switchLedgerManually(ledger);
74+
75+
// verify.
76+
assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
77+
78+
// cleanup.
79+
cursor.close();
80+
ledger.close();
81+
factory.getEntryCacheManager().clear();
82+
factory.shutdown();
83+
config.setManagedLedgerInterceptor(null);
84+
}
85+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222
import io.netty.buffer.Unpooled;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CountDownLatch;
2425
import java.util.function.Predicate;
2526
import lombok.Cleanup;
@@ -32,6 +33,7 @@
3233
import org.apache.bookkeeper.mledger.Position;
3334
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
3435
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
36+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
3537
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
3638
import org.apache.bookkeeper.mledger.impl.PositionImpl;
3739
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
@@ -60,7 +62,7 @@
6062
public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
6163
private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
6264

63-
public class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
65+
public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
6466
@Override
6567
public Processor inputProcessor() {
6668
return new Processor() {
@@ -158,6 +160,47 @@ public void testMessagePayloadProcessor() throws Exception {
158160
config.setManagedLedgerInterceptor(null);
159161
}
160162

163+
@Test
164+
public void testTotalSizeCorrectIfHasInterceptor() throws Exception {
165+
final String mlName = "ml1";
166+
final String cursorName = "cursor1";
167+
168+
// Registry interceptor.
169+
ManagedLedgerConfig config = new ManagedLedgerConfig();
170+
Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
171+
processors.add(new TestPayloadProcessor());
172+
ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors);
173+
config.setManagedLedgerInterceptor(interceptor);
174+
config.setMaxEntriesPerLedger(2);
175+
176+
// Add many entries and consume.
177+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config);
178+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName);
179+
for (int i = 0; i < 5; i++){
180+
cursor.delete(ledger.addEntry(new byte[1]));
181+
}
182+
183+
// Trim ledgers.
184+
CompletableFuture<Void> trimLedgerFuture = new CompletableFuture<>();
185+
ledger.trimConsumedLedgersInBackground(trimLedgerFuture);
186+
trimLedgerFuture.join();
187+
188+
// verify.
189+
assertEquals(ledger.getTotalSize(), calculatePreciseSize(ledger));
190+
191+
// cleanup.
192+
cursor.close();
193+
ledger.close();
194+
factory.getEntryCacheManager().clear();
195+
factory.shutdown();
196+
config.setManagedLedgerInterceptor(null);
197+
}
198+
199+
public static long calculatePreciseSize(ManagedLedgerImpl ledger){
200+
return ledger.getLedgersInfo().values().stream()
201+
.map(info -> info.getSize()).reduce((l1,l2) -> l1 + l2).orElse(0L) + ledger.getCurrentLedgerSize();
202+
}
203+
161204
@Test(timeOut = 20000)
162205
public void testRecoveryIndex() throws Exception {
163206
final int MOCK_BATCH_SIZE = 2;

0 commit comments

Comments
 (0)