Skip to content

Commit 9ccc562

Browse files
authored
[fix][broker] Pass bytesToRead when reading compacted entries (apache#20850)
1 parent b8e6948 commit 9ccc562

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ protected void readMoreEntries(Consumer consumer) {
350350
havePendingRead = true;
351351
if (consumer.readCompacted()) {
352352
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
353-
CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(), cursor, messagesToRead,
354-
readFromEarliest, this, consumer);
353+
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor,
354+
messagesToRead, bytesToRead, readFromEarliest, this, true, consumer);
355355
} else {
356356
ReadEntriesCtx readEntriesCtx =
357357
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ public interface CompactedTopic {
3333
/**
3434
* Read entries from compacted topic.
3535
*
36-
* @deprecated Use {@link CompactedTopicUtils#readCompactedEntries(TopicCompactionService, ManagedCursor,
37-
* int, boolean, ReadEntriesCallback, Consumer)} instead.
36+
* @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor,
37+
* int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
3838
*/
3939
@Deprecated
4040
void asyncReadEntriesOrWait(ManagedCursor cursor,

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@
3939
public class CompactedTopicUtils {
4040

4141
@Beta
42-
public static void readCompactedEntries(TopicCompactionService topicCompactionService, ManagedCursor cursor,
43-
int numberOfEntriesToRead, boolean readFromEarliest,
44-
AsyncCallbacks.ReadEntriesCallback callback, @Nullable Consumer consumer) {
42+
public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService,
43+
ManagedCursor cursor, int numberOfEntriesToRead,
44+
long bytesToRead, boolean readFromEarliest,
45+
AsyncCallbacks.ReadEntriesCallback callback,
46+
boolean wait, @Nullable Consumer consumer) {
4547
Objects.requireNonNull(topicCompactionService);
4648
Objects.requireNonNull(cursor);
4749
checkArgument(numberOfEntriesToRead > 0);
@@ -64,7 +66,13 @@ public static void readCompactedEntries(TopicCompactionService topicCompactionSe
6466
if (lastCompactedPosition == null
6567
|| readPosition.compareTo(
6668
lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
67-
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
69+
if (wait) {
70+
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, bytesToRead, callback, readEntriesCtx,
71+
PositionImpl.LATEST);
72+
} else {
73+
cursor.asyncReadEntries(numberOfEntriesToRead, bytesToRead, callback, readEntriesCtx,
74+
PositionImpl.LATEST);
75+
}
6876
return CompletableFuture.completedFuture(null);
6977
}
7078

0 commit comments

Comments
 (0)