Skip to content

Commit cdeef00

Browse files
authored
[fix] [broker] Topic close failure leaves subscription in a permanent fence state (apache#19692)
Motivation : After a Topic close failure or a delete failure, the fence state will be reset to get the topic back to work,but it will not reset the fence state of the subscription, which will result in the consumer never being able to connect to the broker. Modifications: Reset the fence state of subscriptions if the operation of topic close is failed.
1 parent cbd799f commit cdeef00

File tree

3 files changed

+87
-8
lines changed

3 files changed

+87
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
126126
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
127127
private final PendingAckHandle pendingAckHandle;
128128
private volatile Map<String, String> subscriptionProperties;
129+
private volatile CompletableFuture<Void> fenceFuture;
129130

130131
static {
131132
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
@@ -897,27 +898,49 @@ public CompletableFuture<Void> close() {
897898
*/
898899
@Override
899900
public synchronized CompletableFuture<Void> disconnect() {
900-
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
901+
if (fenceFuture != null){
902+
return fenceFuture;
903+
}
904+
fenceFuture = new CompletableFuture<>();
901905

902906
// block any further consumers on this subscription
903907
IS_FENCED_UPDATER.set(this, TRUE);
904908

905909
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null))
906910
.thenCompose(v -> close()).thenRun(() -> {
907911
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
908-
disconnectFuture.complete(null);
912+
fenceFuture.complete(null);
909913
}).exceptionally(exception -> {
910-
IS_FENCED_UPDATER.set(this, FALSE);
911-
if (dispatcher != null) {
912-
dispatcher.reset();
913-
}
914914
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
915915
exception);
916-
disconnectFuture.completeExceptionally(exception);
916+
fenceFuture.completeExceptionally(exception);
917+
resumeAfterFence();
917918
return null;
918919
});
920+
return fenceFuture;
921+
}
919922

920-
return disconnectFuture;
923+
/**
924+
* Resume subscription after topic deletion or close failure.
925+
*/
926+
public synchronized void resumeAfterFence() {
927+
// If "fenceFuture" is null, it means that "disconnect" has never been called.
928+
if (fenceFuture != null) {
929+
fenceFuture.whenComplete((ignore, ignoreEx) -> {
930+
synchronized (PersistentSubscription.this) {
931+
try {
932+
if (IS_FENCED_UPDATER.compareAndSet(this, TRUE, FALSE)) {
933+
if (dispatcher != null) {
934+
dispatcher.reset();
935+
}
936+
}
937+
fenceFuture = null;
938+
} catch (Exception ex) {
939+
log.error("[{}] Resume subscription [{}] failure", topicName, subName, ex);
940+
}
941+
}
942+
});
943+
}
921944
}
922945

923946
/**

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3234,6 +3234,7 @@ private void fenceTopicToCloseOrDelete() {
32343234
}
32353235

32363236
private void unfenceTopicToResume() {
3237+
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
32373238
isFenced = false;
32383239
isClosingOrDeleting = false;
32393240
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
package org.apache.pulsar.broker.service.persistent;
2020

2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyInt;
23+
import static org.mockito.Mockito.doAnswer;
2224
import static org.mockito.Mockito.doNothing;
25+
import static org.mockito.Mockito.doReturn;
2326
import static org.mockito.Mockito.mock;
2427
import static org.mockito.Mockito.spy;
2528
import static org.mockito.Mockito.times;
@@ -39,11 +42,15 @@
3942
import java.util.Collection;
4043
import java.util.List;
4144
import java.util.UUID;
45+
import java.util.concurrent.CompletableFuture;
4246
import java.util.concurrent.CountDownLatch;
47+
import java.util.concurrent.ExecutionException;
4348
import java.util.concurrent.TimeUnit;
49+
import java.util.concurrent.atomic.AtomicBoolean;
4450
import lombok.Cleanup;
4551
import org.apache.bookkeeper.client.LedgerHandle;
4652
import org.apache.bookkeeper.mledger.ManagedLedger;
53+
import org.apache.pulsar.broker.service.BrokerService;
4754
import org.apache.pulsar.broker.service.BrokerTestBase;
4855
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
4956
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -470,4 +477,52 @@ public void testCompatibilityWithPartitionKeyword() throws PulsarAdminException,
470477
Assert.assertThrows(PulsarAdminException.NotFoundException.class,
471478
() -> admin.topics().getPartitionedTopicMetadata(topicName));
472479
}
480+
481+
@Test
482+
public void testDeleteTopicFail() throws Exception {
483+
final String fullyTopicName = "persistent://prop/ns-abc/" + "tp_"
484+
+ UUID.randomUUID().toString().replaceAll("-", "");
485+
// Mock topic.
486+
BrokerService brokerService = spy(pulsar.getBrokerService());
487+
doReturn(brokerService).when(pulsar).getBrokerService();
488+
489+
// Create a sub, and send one message.
490+
Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(fullyTopicName).subscriptionName("sub1")
491+
.subscribe();
492+
consumer1.close();
493+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(fullyTopicName).create();
494+
producer.send("1");
495+
producer.close();
496+
497+
// Make a failed delete operation.
498+
AtomicBoolean makeDeletedFailed = new AtomicBoolean(true);
499+
PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopic(fullyTopicName, false).get().get();
500+
doAnswer(invocation -> {
501+
CompletableFuture future = (CompletableFuture) invocation.getArguments()[1];
502+
if (makeDeletedFailed.get()) {
503+
future.completeExceptionally(new RuntimeException("mock ex for test"));
504+
} else {
505+
future.complete(null);
506+
}
507+
return null;
508+
}).when(brokerService)
509+
.deleteTopicAuthenticationWithRetry(any(String.class), any(CompletableFuture.class), anyInt());
510+
try {
511+
persistentTopic.delete().get();
512+
} catch (Exception e) {
513+
org.testng.Assert.assertTrue(e instanceof ExecutionException);
514+
org.testng.Assert.assertTrue(e.getCause() instanceof java.lang.RuntimeException);
515+
org.testng.Assert.assertEquals(e.getCause().getMessage(), "mock ex for test");
516+
}
517+
518+
// Assert topic works after deleting failure.
519+
Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(fullyTopicName).subscriptionName("sub1")
520+
.subscribe();
521+
org.testng.Assert.assertEquals("1", consumer2.receive(2, TimeUnit.SECONDS).getValue());
522+
consumer2.close();
523+
524+
// Make delete success.
525+
makeDeletedFailed.set(false);
526+
persistentTopic.delete().get();
527+
}
473528
}

0 commit comments

Comments
 (0)