Skip to content

Commit a0372f4

Browse files
authored
[fix][broker] Fix IndexOutOfBoundsException in the CompactedTopicUtils (apache#20887)
1 parent ffb9d30 commit a0372f4

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
8585
}
8686
cursor.seek(seekToPosition);
8787
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
88+
return;
8889
}
8990

9091
Entry lastEntry = entries.get(entries.size() - 1);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.compaction;
20+
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
27+
import org.apache.bookkeeper.mledger.Entry;
28+
import org.apache.bookkeeper.mledger.ManagedCursor;
29+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
30+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
31+
import org.mockito.Mockito;
32+
import org.testng.Assert;
33+
import org.testng.annotations.Test;
34+
35+
public class CompactedTopicUtilsTest {
36+
37+
@Test
38+
public void testReadCompactedEntriesWithEmptyEntries() throws ExecutionException, InterruptedException {
39+
PositionImpl lastCompactedPosition = PositionImpl.get(1, 100);
40+
TopicCompactionService service = Mockito.mock(TopicCompactionService.class);
41+
Mockito.doReturn(CompletableFuture.completedFuture(Collections.emptyList()))
42+
.when(service).readCompactedEntries(Mockito.any(), Mockito.intThat(argument -> argument > 0));
43+
Mockito.doReturn(CompletableFuture.completedFuture(lastCompactedPosition)).when(service)
44+
.getLastCompactedPosition();
45+
46+
47+
PositionImpl initPosition = PositionImpl.get(1, 90);
48+
AtomicReference<PositionImpl> readPositionRef = new AtomicReference<>(initPosition.getNext());
49+
ManagedCursor cursor = Mockito.mock(ManagedCursor.class);
50+
Mockito.doReturn(readPositionRef.get()).when(cursor).getReadPosition();
51+
Mockito.doAnswer(invocation -> {
52+
readPositionRef.set(invocation.getArgument(0));
53+
return null;
54+
}).when(cursor).seek(Mockito.any());
55+
56+
CompletableFuture<List<Entry>> completableFuture = new CompletableFuture<>();
57+
final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
58+
AsyncCallbacks.ReadEntriesCallback readEntriesCallback = new AsyncCallbacks.ReadEntriesCallback() {
59+
@Override
60+
public void readEntriesComplete(List<Entry> entries, Object ctx) {
61+
completableFuture.complete(entries);
62+
}
63+
64+
@Override
65+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
66+
completableFuture.completeExceptionally(exception);
67+
throwableRef.set(exception);
68+
}
69+
};
70+
71+
CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, false,
72+
readEntriesCallback, false, null);
73+
74+
List<Entry> entries = completableFuture.get();
75+
Assert.assertTrue(entries.isEmpty());
76+
Assert.assertNull(throwableRef.get());
77+
Assert.assertEquals(readPositionRef.get(), lastCompactedPosition.getNext());
78+
}
79+
}

0 commit comments

Comments
 (0)