Skip to content

Commit 3314d70

Browse files
authored
[fix] [ml] topic load fail by ledger lost (apache#19444)
Makes only ledgers removed from the meta of ledger info can be deleted from the BK.
1 parent 02b25a3 commit 3314d70

File tree

3 files changed

+135
-11
lines changed

3 files changed

+135
-11
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.function.Predicate;
6969
import java.util.function.Supplier;
7070
import java.util.stream.Collectors;
71+
import javax.annotation.Nullable;
7172
import lombok.Getter;
7273
import org.apache.bookkeeper.client.AsyncCallback;
7374
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -471,6 +472,7 @@ protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLe
471472
}
472473

473474
// Calculate total entries and size
475+
final List<Long> emptyLedgersToBeDeleted = Collections.synchronizedList(new ArrayList<>());
474476
Iterator<LedgerInfo> iterator = ledgers.values().iterator();
475477
while (iterator.hasNext()) {
476478
LedgerInfo li = iterator.next();
@@ -479,9 +481,7 @@ protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLe
479481
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
480482
} else {
481483
iterator.remove();
482-
bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> {
483-
log.info("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc);
484-
}, null);
484+
emptyLedgersToBeDeleted.add(li.getLedgerId());
485485
}
486486
}
487487

@@ -497,6 +497,11 @@ protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLe
497497
@Override
498498
public void operationComplete(Void v, Stat stat) {
499499
ledgersStat = stat;
500+
emptyLedgersToBeDeleted.forEach(ledgerId -> {
501+
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
502+
log.info("[{}] Deleted empty ledger ledgerId={} rc={}", name, ledgerId, rc);
503+
}, null);
504+
});
500505
initializeCursors(callback);
501506
}
502507

@@ -1551,11 +1556,12 @@ public void operationComplete(Void v, Stat stat) {
15511556
}
15521557
ledgersStat = stat;
15531558
synchronized (ManagedLedgerImpl.this) {
1559+
LedgerHandle originalCurrentLedger = currentLedger;
15541560
ledgers.put(lh.getId(), newLedger);
15551561
currentLedger = lh;
15561562
currentLedgerEntries = 0;
15571563
currentLedgerSize = 0;
1558-
updateLedgersIdsComplete();
1564+
updateLedgersIdsComplete(originalCurrentLedger);
15591565
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
15601566
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
15611567
}
@@ -1646,8 +1652,15 @@ void createNewOpAddEntryForNewLedger() {
16461652
} while (existsOp != null && --pendingSize > 0);
16471653
}
16481654

1649-
protected synchronized void updateLedgersIdsComplete() {
1655+
protected synchronized void updateLedgersIdsComplete(@Nullable LedgerHandle originalCurrentLedger) {
16501656
STATE_UPDATER.set(this, State.LedgerOpened);
1657+
// Delete original "currentLedger" if it has been removed from "ledgers".
1658+
if (originalCurrentLedger != null && !ledgers.containsKey(originalCurrentLedger.getId())){
1659+
bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc, ctx) -> {
1660+
mbean.endDataLedgerDeleteOp();
1661+
log.info("[{}] Delete complete for empty ledger {}. rc={}", name, originalCurrentLedger.getId(), rc);
1662+
}, null);
1663+
}
16511664
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
16521665

16531666
if (log.isDebugEnabled()) {
@@ -1710,10 +1723,6 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
17101723
// The last ledger was empty, so we can discard it
17111724
ledgers.remove(lh.getId());
17121725
mbean.startDataLedgerDeleteOp();
1713-
bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
1714-
mbean.endDataLedgerDeleteOp();
1715-
log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc);
1716-
}, null);
17171726
}
17181727

17191728
trimConsumedLedgersInBackground();

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.bookkeeper.client.AsyncCallback;
3131
import org.apache.bookkeeper.client.BKException;
3232
import org.apache.bookkeeper.client.BookKeeper;
33+
import org.apache.bookkeeper.client.LedgerHandle;
3334
import org.apache.bookkeeper.common.util.OrderedScheduler;
3435
import org.apache.bookkeeper.mledger.AsyncCallbacks;
3536
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -332,7 +333,7 @@ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLe
332333
currentLedgerEntries = 0;
333334
currentLedgerSize = 0;
334335
initLastConfirmedEntry();
335-
updateLedgersIdsComplete();
336+
updateLedgersIdsComplete(null);
336337
maybeUpdateCursorBeforeTrimmingConsumedLedger();
337338
} else if (isNoSuchLedgerExistsException(rc)) {
338339
log.warn("[{}] Source ledger not found: {}", name, lastLedgerId);
@@ -365,7 +366,7 @@ public synchronized void asyncClose(AsyncCallbacks.CloseCallback callback, Objec
365366
}
366367

367368
@Override
368-
protected synchronized void updateLedgersIdsComplete() {
369+
protected synchronized void updateLedgersIdsComplete(LedgerHandle originalCurrentLedger) {
369370
STATE_UPDATER.set(this, State.LedgerOpened);
370371
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
371372

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.nio.charset.Charset;
4949
import java.nio.charset.StandardCharsets;
5050
import java.security.GeneralSecurityException;
51+
import java.time.Duration;
5152
import java.util.ArrayList;
5253
import java.util.Arrays;
5354
import java.util.Collections;
@@ -75,7 +76,9 @@
7576
import java.util.concurrent.atomic.AtomicReference;
7677
import java.util.function.Predicate;
7778
import lombok.Cleanup;
79+
import lombok.Data;
7880
import lombok.extern.slf4j.Slf4j;
81+
import org.apache.bookkeeper.client.AsyncCallback;
7982
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
8083
import org.apache.bookkeeper.client.BKException;
8184
import org.apache.bookkeeper.client.BookKeeper;
@@ -144,6 +147,117 @@ public Object[][] checkOwnershipFlagProvider() {
144147
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
145148
}
146149

150+
private void makeAddEntryTimeout(ManagedLedgerImpl ml, AtomicBoolean addEntryFinished) throws Exception {
151+
LedgerHandle currentLedger = ml.currentLedger;
152+
final LedgerHandle spyLedgerHandle = spy(currentLedger);
153+
doAnswer(invocation -> {
154+
ByteBuf bs = (ByteBuf) invocation.getArguments()[0];
155+
AddCallback addCallback = (AddCallback) invocation.getArguments()[1];
156+
Object originalContext = invocation.getArguments()[2];
157+
currentLedger.asyncAddEntry(bs, (rc, lh, entryId, ctx) -> {
158+
addEntryFinished.set(true);
159+
addCallback.addComplete(BKException.Code.TimeoutException, spyLedgerHandle, -1, ctx);
160+
}, originalContext);
161+
return null;
162+
}).when(spyLedgerHandle).asyncAddEntry(any(ByteBuf.class), any(AddCallback.class), any());
163+
ml.currentLedger = spyLedgerHandle;
164+
}
165+
166+
@Data
167+
private static class DeleteLedgerInfo{
168+
volatile boolean hasCalled;
169+
volatile CompletableFuture<Void> future = new CompletableFuture<>();
170+
}
171+
172+
private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final AtomicBoolean signal,
173+
BookKeeper spyBookKeeper) {
174+
DeleteLedgerInfo deleteLedgerInfo = new DeleteLedgerInfo();
175+
doAnswer(invocation -> {
176+
long ledgerId = (long) invocation.getArguments()[0];
177+
AsyncCallback.DeleteCallback originalCb = (AsyncCallback.DeleteCallback) invocation.getArguments()[1];
178+
AsyncCallback.DeleteCallback cb = (rc, ctx) -> {
179+
if (deleteLedgerInfo.hasCalled) {
180+
deleteLedgerInfo.future.complete(null);
181+
}
182+
originalCb.deleteComplete(rc, ctx);
183+
};
184+
Object ctx = invocation.getArguments()[2];
185+
if (ledgerId != ledger.getId()){
186+
bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
187+
} else {
188+
deleteLedgerInfo.hasCalled = true;
189+
new Thread(() -> {
190+
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
191+
bkc.asyncDeleteLedger(ledgerId, cb, ctx);
192+
}).start();
193+
}
194+
return null;
195+
}).when(spyBookKeeper).asyncDeleteLedger(any(long.class), any(AsyncCallback.DeleteCallback.class), any());
196+
return deleteLedgerInfo;
197+
}
198+
199+
/***
200+
* This test simulates the following problems that can occur when ZK connections are unstable:
201+
* - add entry timeout
202+
* - write ZK fail when update ledger info of ML
203+
* and verifies that ledger info of ML is still correct when the above problems occur.
204+
*/
205+
@Test
206+
public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
207+
String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
208+
BookKeeper spyBookKeeper = spy(bkc);
209+
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
210+
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
211+
212+
// Make add entry timeout(The data write was actually successful).
213+
AtomicBoolean addEntryFinished = new AtomicBoolean(false);
214+
makeAddEntryTimeout(ml, addEntryFinished);
215+
216+
// Make the update operation of ledger info failure when switch ledger.
217+
metadataStore.failConditional(new MetadataStoreException.BadVersionException(""), (opType, path) -> {
218+
if (opType == FaultInjectionMetadataStore.OperationType.PUT && addEntryFinished.get()
219+
&& "/managed-ledgers/testLedgerInfoMetaCorrectIfAddEntryTimeOut".equals(path)) {
220+
return true;
221+
}
222+
return false;
223+
});
224+
225+
// Make delete ledger is delayed if delete is called.
226+
AtomicBoolean deleteLedgerDelaySignal = new AtomicBoolean(false);
227+
DeleteLedgerInfo deleteLedgerInfo =
228+
makeDelayIfDoLedgerDelete(ml.currentLedger, deleteLedgerDelaySignal, spyBookKeeper);
229+
230+
// Add one entry.
231+
// - it will fail and trigger ledger switch(we mocked the error).
232+
// - ledger switch will also fail(we mocked the error).
233+
try {
234+
ml.addEntry("1".getBytes(Charset.defaultCharset()));
235+
fail("Expected the operation of add entry will fail by timeout or ledger fenced.");
236+
} catch (Exception e){
237+
// expected ex.
238+
}
239+
240+
// Reopen ML.
241+
try {
242+
ml.close();
243+
fail("Expected the operation of ml close will fail by fenced state.");
244+
} catch (Exception e){
245+
// expected ex.
246+
}
247+
ManagedLedgerImpl mlReopened = (ManagedLedgerImpl) factory.open(mlName);
248+
deleteLedgerDelaySignal.set(true);
249+
if (deleteLedgerInfo.hasCalled){
250+
deleteLedgerInfo.future.join();
251+
}
252+
mlReopened.close();
253+
254+
// verify: all ledgers in ledger info is worked.
255+
for (long ledgerId : mlReopened.getLedgersInfo().keySet()){
256+
LedgerHandle lh = bkc.openLedger(ledgerId, ml.digestType, ml.getConfig().getPassword());
257+
lh.close();
258+
}
259+
}
260+
147261
@Test
148262
public void managedLedgerApi() throws Exception {
149263
ManagedLedger ledger = factory.open("my_test_ledger");

0 commit comments

Comments
 (0)