Skip to content

Commit 4ccb5bb

Browse files
[improve][broker] Add the MessageExpirer interface to make code clear (apache#20800)
Co-authored-by: tison <[email protected]>
1 parent 9256407 commit 4ccb5bb

File tree

5 files changed

+60
-53
lines changed

5 files changed

+60
-53
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
8181
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
8282
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
83+
import org.apache.pulsar.broker.service.MessageExpirer;
8384
import org.apache.pulsar.broker.service.Subscription;
8485
import org.apache.pulsar.broker.service.Topic;
8586
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
@@ -3955,28 +3956,20 @@ private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartit
39553956
}
39563957
PersistentTopic topic = (PersistentTopic) t;
39573958

3958-
boolean issued;
3959+
final MessageExpirer messageExpirer;
39593960
if (subName.startsWith(topic.getReplicatorPrefix())) {
39603961
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
3961-
PersistentReplicator repl = (PersistentReplicator) topic
3962-
.getPersistentReplicator(remoteCluster);
3963-
if (repl == null) {
3964-
resultFuture.completeExceptionally(
3965-
new RestException(Status.NOT_FOUND, "Replicator not found"));
3966-
return;
3967-
}
3968-
issued = repl.expireMessages(expireTimeInSeconds);
3962+
messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
39693963
} else {
3970-
PersistentSubscription sub = topic.getSubscription(subName);
3971-
if (sub == null) {
3972-
resultFuture.completeExceptionally(
3973-
new RestException(Status.NOT_FOUND,
3974-
getSubNotFoundErrorMessage(topicName.toString(), subName)));
3975-
return;
3976-
}
3977-
issued = sub.expireMessages(expireTimeInSeconds);
3964+
messageExpirer = topic.getSubscription(subName);
3965+
}
3966+
if (messageExpirer == null) {
3967+
final String message = subName.startsWith(topic.getReplicatorPrefix())
3968+
? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName);
3969+
resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, message));
3970+
return;
39783971
}
3979-
if (issued) {
3972+
if (messageExpirer.expireMessages(expireTimeInSeconds)) {
39803973
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
39813974
expireTimeInSeconds, topicName, subName);
39823975
resultFuture.complete(null);
@@ -4066,44 +4059,27 @@ private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosit
40664059
return;
40674060
}
40684061
try {
4069-
PersistentSubscription sub = null;
4070-
PersistentReplicator repl = null;
4071-
4062+
final MessageExpirer messageExpirer;
40724063
if (subName.startsWith(topic.getReplicatorPrefix())) {
40734064
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
4074-
repl = (PersistentReplicator)
4075-
topic.getPersistentReplicator(remoteCluster);
4076-
if (repl == null) {
4077-
asyncResponse.resume(new RestException(Status.NOT_FOUND,
4078-
"Replicator not found"));
4079-
return;
4080-
}
4065+
messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
40814066
} else {
4082-
sub = topic.getSubscription(subName);
4083-
if (sub == null) {
4084-
asyncResponse.resume(new RestException(Status.NOT_FOUND,
4085-
getSubNotFoundErrorMessage(topicName.toString(), subName)));
4086-
return;
4087-
}
4067+
messageExpirer = topic.getSubscription(subName);
4068+
}
4069+
if (messageExpirer == null) {
4070+
final String message = (subName.startsWith(topic.getReplicatorPrefix()))
4071+
? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName);
4072+
asyncResponse.resume(new RestException(Status.NOT_FOUND, message));
4073+
return;
40884074
}
40894075

40904076
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
40914077
getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);
40924078

4093-
PersistentReplicator finalRepl = repl;
4094-
PersistentSubscription finalSub = sub;
4095-
40964079
batchSizeFuture.thenAccept(bi -> {
40974080
PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
4098-
boolean issued;
40994081
try {
4100-
if (subName.startsWith(topic.getReplicatorPrefix())) {
4101-
issued = finalRepl.expireMessages(position);
4102-
} else {
4103-
issued = finalSub.expireMessages(position);
4104-
}
4105-
4106-
if (issued) {
4082+
if (messageExpirer.expireMessages(position)) {
41074083
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position,
41084084
topicName, subName);
41094085
} else {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import org.apache.bookkeeper.mledger.Position;
22+
import org.apache.pulsar.common.classification.InterfaceStability;
23+
24+
@InterfaceStability.Evolving
25+
public interface MessageExpirer {
26+
27+
boolean expireMessages(Position position);
28+
29+
boolean expireMessages(int messageTTLInSeconds);
30+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
3131
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
3232

33-
public interface Subscription {
33+
public interface Subscription extends MessageExpirer {
3434

3535
BrokerInterceptor interceptor();
3636

@@ -84,10 +84,6 @@ default long getNumberOfEntriesDelayed() {
8484

8585
CompletableFuture<Entry> peekNthMessage(int messagePosition);
8686

87-
boolean expireMessages(int messageTTLInSeconds);
88-
89-
boolean expireMessages(Position position);
90-
9187
void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch);
9288

9389
void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,16 @@
3232
import org.apache.bookkeeper.mledger.Position;
3333
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
3434
import org.apache.bookkeeper.mledger.impl.PositionImpl;
35+
import org.apache.pulsar.broker.service.MessageExpirer;
3536
import org.apache.pulsar.client.impl.MessageImpl;
3637
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
3738
import org.apache.pulsar.common.protocol.Commands;
3839
import org.apache.pulsar.common.stats.Rate;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
41-
4242
/**
4343
*/
44-
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
44+
public class PersistentMessageExpiryMonitor implements FindEntryCallback, MessageExpirer {
4545
private final ManagedCursor cursor;
4646
private final String subName;
4747
private final PersistentTopic topic;
@@ -73,6 +73,7 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription
7373
&& this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
7474
}
7575

76+
@Override
7677
public boolean expireMessages(int messageTTLInSeconds) {
7778
if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
7879
log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
@@ -99,6 +100,7 @@ public boolean expireMessages(int messageTTLInSeconds) {
99100
}
100101
}
101102

103+
@Override
102104
public boolean expireMessages(Position messagePosition) {
103105
// If it's beyond last position of this topic, do nothing.
104106
PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.pulsar.broker.service.AbstractReplicator;
4848
import org.apache.pulsar.broker.service.BrokerService;
4949
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
50+
import org.apache.pulsar.broker.service.MessageExpirer;
5051
import org.apache.pulsar.broker.service.Replicator;
5152
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
5253
import org.apache.pulsar.client.api.MessageId;
@@ -66,7 +67,7 @@
6667
import org.slf4j.LoggerFactory;
6768

6869
public abstract class PersistentReplicator extends AbstractReplicator
69-
implements Replicator, ReadEntriesCallback, DeleteCallback {
70+
implements Replicator, ReadEntriesCallback, DeleteCallback, MessageExpirer {
7071

7172
protected final PersistentTopic topic;
7273
protected final ManagedCursor cursor;
@@ -600,6 +601,7 @@ private long getReplicationDelayInSeconds() {
600601
return 0L;
601602
}
602603

604+
@Override
603605
public boolean expireMessages(int messageTTLInSeconds) {
604606
if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
605607
|| (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
@@ -611,6 +613,7 @@ public boolean expireMessages(int messageTTLInSeconds) {
611613
return expiryMonitor.expireMessages(messageTTLInSeconds);
612614
}
613615

616+
@Override
614617
public boolean expireMessages(Position position) {
615618
return expiryMonitor.expireMessages(position);
616619
}

0 commit comments

Comments
 (0)