Skip to content

Commit e13865c

Browse files
authored
[fix] [client] fix memory leak if enabled pooled messages (apache#19585)
1 parent e0fe818 commit e13865c

File tree

4 files changed

+52
-3
lines changed

4 files changed

+52
-3
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -947,9 +947,14 @@ public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Ex
947947
ByteBuf payload = ((MessageImpl) msg).getPayload();
948948
assertNotEquals(payload.refCnt(), 0);
949949
consumer.redeliverUnacknowledgedMessages();
950-
assertEquals(payload.refCnt(), 0);
950+
Awaitility.await().untilAsserted(() -> {
951+
assertTrue(consumer.incomingMessages.size() >= 100);
952+
});
951953
consumer.close();
952954
producer.close();
955+
admin.topics().delete(topic, false);
956+
assertEquals(consumer.incomingMessages.size(), 0);
957+
assertEquals(payload.refCnt(), 0);
953958
}
954959

955960
/**

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.Map;
3030
import java.util.Optional;
3131
import java.util.Set;
32-
import java.util.concurrent.BlockingQueue;
3332
import java.util.concurrent.CompletableFuture;
3433
import java.util.concurrent.ConcurrentLinkedQueue;
3534
import java.util.concurrent.ExecutionException;
@@ -82,7 +81,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
8281
protected final ExecutorService externalPinnedExecutor;
8382
protected final ExecutorService internalPinnedExecutor;
8483
protected UnAckedMessageTracker unAckedMessageTracker;
85-
final BlockingQueue<Message<T>> incomingMessages;
84+
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
8685
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
8786
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
8887
protected final int maxReceiverQueueSize;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,18 @@ private void closeConsumerTasks() {
10751075
}
10761076
negativeAcksTracker.close();
10771077
stats.getStatTimeout().ifPresent(Timeout::cancel);
1078+
if (poolMessages) {
1079+
releasePooledMessagesAndStopAcceptNew();
1080+
}
1081+
}
1082+
1083+
/**
1084+
* If enabled pooled messages, we should release the messages after closing consumer and stop accept the new
1085+
* messages.
1086+
*/
1087+
private void releasePooledMessagesAndStopAcceptNew() {
1088+
incomingMessages.terminate(message -> message.release());
1089+
clearIncomingMessages();
10781090
}
10791091

10801092
void activeConsumerChanged(boolean isActive) {

pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.locks.ReentrantLock;
3333
import java.util.concurrent.locks.StampedLock;
3434
import java.util.function.Consumer;
35+
import javax.annotation.Nullable;
3536

3637
/**
3738
* This implements a {@link BlockingQueue} backed by an array with no fixed capacity.
@@ -53,6 +54,10 @@ public class GrowableArrayBlockingQueue<T> extends AbstractQueue<T> implements B
5354
.newUpdater(GrowableArrayBlockingQueue.class, "size");
5455
private volatile int size = 0;
5556

57+
private volatile boolean terminated = false;
58+
59+
private volatile Consumer<T> itemAfterTerminatedHandler;
60+
5661
public GrowableArrayBlockingQueue() {
5762
this(64);
5863
}
@@ -132,6 +137,13 @@ public void put(T e) {
132137
boolean wasEmpty = false;
133138

134139
try {
140+
if (terminated){
141+
if (itemAfterTerminatedHandler != null) {
142+
itemAfterTerminatedHandler.accept(e);
143+
}
144+
return;
145+
}
146+
135147
if (SIZE_UPDATER.get(this) == data.length) {
136148
expandArray();
137149
}
@@ -401,6 +413,27 @@ public String toString() {
401413
return sb.toString();
402414
}
403415

416+
/**
417+
* Make the queue not accept new items. if there are still new data trying to enter the queue, it will be handed
418+
* by {@param itemAfterTerminatedHandler}.
419+
*/
420+
public void terminate(@Nullable Consumer<T> itemAfterTerminatedHandler) {
421+
// After wait for the in-flight item enqueue, it means the operation of terminate is finished.
422+
long stamp = tailLock.writeLock();
423+
try {
424+
terminated = true;
425+
if (itemAfterTerminatedHandler != null) {
426+
this.itemAfterTerminatedHandler = itemAfterTerminatedHandler;
427+
}
428+
} finally {
429+
tailLock.unlockWrite(stamp);
430+
}
431+
}
432+
433+
public boolean isTerminated() {
434+
return terminated;
435+
}
436+
404437
@SuppressWarnings("unchecked")
405438
private void expandArray() {
406439
// We already hold the tailLock

0 commit comments

Comments
 (0)