Skip to content

Commit bf407f2

Browse files
authored
[fix][broker]fix Repeated messages of shared streaming dispatcher (apache#18289)
* [fix][test]fix flaky test SimpleProducerConsumerTestStreamingDispatcherTest.testSharedSamePriorityConsumer * remove unnecessary change
1 parent 681d51d commit bf407f2

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
2222
import com.google.common.collect.Lists;
23+
import java.util.List;
2324
import java.util.Set;
2425
import java.util.concurrent.TimeUnit;
2526
import lombok.extern.slf4j.Slf4j;
@@ -52,6 +53,18 @@ public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic topic, Man
5253
super(topic, cursor, subscription);
5354
}
5455

56+
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries,
57+
boolean isLastEntryInBatch) {
58+
sendInProgress = true;
59+
try {
60+
return trySendMessagesToConsumers(readType, entries);
61+
} finally {
62+
if (isLastEntryInBatch) {
63+
sendInProgress = false;
64+
}
65+
}
66+
}
67+
5568
/**
5669
* {@inheritDoc}
5770
*/
@@ -100,12 +113,12 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest
100113
// in a separate thread, and we want to prevent more reads
101114
sendInProgress = true;
102115
dispatchMessagesThread.execute(safeRun(() -> {
103-
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
116+
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), ctx.isLast())) {
104117
readMoreEntries();
105118
}
106119
}));
107120
} else {
108-
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
121+
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), ctx.isLast())) {
109122
readMoreEntriesAsync();
110123
}
111124
}

0 commit comments

Comments
 (0)