Skip to content

Commit cd84b93

Browse files
authored
[fix] [ml] fix wrong cursor state if repeat do close (apache#18340)
1 parent 8ad7157 commit cd84b93

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2623,8 +2623,8 @@ public void operationFailed(MetaStoreException e) {
26232623

26242624
@Override
26252625
public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
2626-
State oldState = STATE_UPDATER.getAndSet(this, State.Closing);
2627-
if (oldState == State.Closed || oldState == State.Closing) {
2626+
boolean alreadyClosing = !trySetStateToClosing();
2627+
if (alreadyClosing) {
26282628
log.info("[{}] [{}] State is already closed", ledger.getName(), name);
26292629
callback.closeComplete(ctx);
26302630
return;
@@ -2707,6 +2707,28 @@ public void operationFailed(ManagedLedgerException exception) {
27072707
});
27082708
}
27092709

2710+
/**
2711+
* Try set {@link #state} to {@link State#Closing}.
2712+
* @return false if the {@link #state} already is {@link State#Closing} or {@link State#Closed}.
2713+
*/
2714+
private boolean trySetStateToClosing() {
2715+
final AtomicBoolean notClosing = new AtomicBoolean(false);
2716+
STATE_UPDATER.updateAndGet(this, state -> {
2717+
switch (state){
2718+
case Closing:
2719+
case Closed: {
2720+
notClosing.set(false);
2721+
return state;
2722+
}
2723+
default: {
2724+
notClosing.set(true);
2725+
return State.Closing;
2726+
}
2727+
}
2728+
});
2729+
return notClosing.get();
2730+
}
2731+
27102732
private void flushPendingMarkDeletes() {
27112733
if (!pendingMarkDeleteOps.isEmpty()) {
27122734
internalFlushPendingMarkDeletes();

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,21 @@ public void testCloseCursor() throws Exception {
143143
ledger.close();
144144
}
145145

146+
@Test
147+
public void testRepeatCloseCursor() throws Exception {
148+
ManagedLedgerConfig config = new ManagedLedgerConfig();
149+
config.setMaxUnackedRangesToPersistInMetadataStore(0);
150+
config.setThrottleMarkDelete(0);
151+
ManagedLedger ledger = factory.open("my_test_ledger", config);
152+
final ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
153+
cursor.close();
154+
cursor.close();
155+
assertEquals(cursor.getState(), ManagedCursorImpl.State.Closed.toString());
156+
// cleanup, "ledger.close" will trigger another "cursor.close"
157+
ledger.close();
158+
assertEquals(cursor.getState(), ManagedCursorImpl.State.Closed.toString());
159+
}
160+
146161
private static void closeCursorLedger(ManagedCursorImpl managedCursor) {
147162
Awaitility.await().until(managedCursor::closeCursorLedger);
148163
}

0 commit comments

Comments
 (0)