Skip to content

Commit fc9e8bf

Browse files
authored
[fix] [ml] messagesConsumedCounter of NonDurableCursor was initialized incorrectly (apache#19355)
1 parent 4165090 commit fc9e8bf

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private void recoverCursor(PositionImpl mdPosition) {
7575
// Initialize the counter such that the difference between the messages written on the ML and the
7676
// messagesConsumed is equal to the current backlog (negated).
7777
if (null != this.readPosition) {
78-
long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) < 0
78+
long initialBacklog = readPosition.compareTo(lastEntryAndCounter.getLeft()) <= 0
7979
? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.getLeft())) : 0;
8080
messagesConsumedCounter = lastEntryAndCounter.getRight() - initialBacklog;
8181
} else {

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,5 +824,20 @@ void deleteNonDurableCursorWithName() throws Exception {
824824
assertEquals(Iterables.size(ledger.getCursors()), 0);
825825
}
826826

827+
@Test
828+
public void testMessagesConsumedCounterInitializedCorrect() throws Exception {
829+
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testMessagesConsumedCounterInitializedCorrect",
830+
new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1));
831+
Position position = ledger.addEntry("1".getBytes(Encoding));
832+
NonDurableCursorImpl cursor = (NonDurableCursorImpl) ledger.newNonDurableCursor(PositionImpl.EARLIEST);
833+
cursor.delete(position);
834+
assertEquals(cursor.getMessagesConsumedCounter(), 1);
835+
assertTrue(cursor.getMessagesConsumedCounter() <= ledger.getEntriesAddedCounter());
836+
// cleanup.
837+
cursor.close();
838+
ledger.close();
839+
}
840+
841+
827842
private static final Logger log = LoggerFactory.getLogger(NonDurableCursorTest.class);
828843
}

0 commit comments

Comments
 (0)