Skip to content

Commit 4139fef

Browse files
authored
[fix] [ml] Topics stats shows msgBacklog but there reality no backlog (apache#19275)
### Motivation #### 1. `readPosition` point to a deleted ledger When `trim ledgers` and `create new cursor` are executed concurrently, it will cause the `readPosition` of the cursor to point to a deleted ledger. | time | `trim ledgers` | `create new cursor` | | --- | --- | --- | | 1 | | set read position and mark deleted position | | 2 | delete ledger | | | 3 | | add the cursor to `ManagedLedger.cursors` | ---- #### 2. Backlog wrong caused by `readPosition` wrong <strong>(Highlight)</strong>Since the read position of the cursor is pointing at a deleted ledger, so deleted messages will never be consumed or acknowledged. Since the backlog in the API `topics stats` response is calculated as this: `managedLedger.entriesAddedCounter - cursor.messagesConsumedCounter`, the result is: Topics stats show `msgBacklog` but there is reality no backlog. - `managedLedger.entriesAddedCounter`: Pulsar will set it to `0` when creating a new managed ledger, it will increment when adding entries. - `cursor.messagesConsumedCounter`: Pulsar will set it to `0` when creating a new cursor, it will increment when acknowledging. For example: - write entries to the managed ledger: `{1:0~1:9}...{5:0~5:9}` - `managedLedger.entriesAddedCounter` is `50` now - create a new cursor, and set the read position to `1:0` - `cursor.messagesConsumedCounter` is `0` now - delete ledgers `1~4` - consume all messages - can only consume the messages {5:0~5:9}, so `cursor.messagesConsumedCounter` is `10` now - the `backlog` in response of `topics stats` is `50 - 10 = 40`, but there reality no backlog ---- #### 3. Reproduce issue Sorry, I spent 4 hours trying to write a non-invasive test, but failed. <strong>(Highlight)</strong>You can reproduce by `testBacklogIfCursorCreateConcurrentWithTrimLedger` in the PR apache#19274 https://github.com/apache/pulsar/blob/a2cdc759fc2710e4dd913eb0485d23ebcaa076a4/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/StatsBackLogTest.java#L163 ### Modifications Avoid the race condition of `cursor.initializeCursorPosition` and `internalTrimLedgers`
1 parent 86205a9 commit 4139fef

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -993,12 +993,11 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
993993
public void operationComplete() {
994994
log.info("[{}] Opened new cursor: {}", name, cursor);
995995
cursor.setActive();
996-
// Update the ack position (ignoring entries that were written while the cursor was being created)
997-
cursor.initializeCursorPosition(InitialPosition.Earliest == initialPosition
998-
? getFirstPositionAndCounter()
999-
: getLastPositionAndCounter());
1000-
1001996
synchronized (ManagedLedgerImpl.this) {
997+
// Update the ack position (ignoring entries that were written while the cursor was being created)
998+
cursor.initializeCursorPosition(InitialPosition.Earliest == initialPosition
999+
? getFirstPositionAndCounter()
1000+
: getLastPositionAndCounter());
10021001
addCursor(cursor);
10031002
uninitializedCursors.remove(cursorName).complete(cursor);
10041003
}

0 commit comments

Comments
 (0)