Skip to content

Commit 02b25a3

Browse files
authored
[fix] [test] fix flaky test PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (apache#19574)
1 parent 8ec6511 commit 02b25a3

File tree

2 files changed

+131
-55
lines changed

2 files changed

+131
-55
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java

Lines changed: 61 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import static org.testng.AssertJUnit.assertTrue;
3838
import io.netty.buffer.ByteBuf;
3939
import io.netty.channel.ChannelHandlerContext;
40+
import io.netty.channel.EventLoopGroup;
41+
import io.netty.util.concurrent.DefaultThreadFactory;
4042
import java.lang.reflect.Field;
4143
import java.net.InetSocketAddress;
4244
import java.util.ArrayList;
@@ -47,6 +49,7 @@
4749
import java.util.concurrent.LinkedBlockingQueue;
4850
import java.util.concurrent.TimeUnit;
4951
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
52+
import java.util.concurrent.atomic.AtomicReference;
5053
import java.util.function.Supplier;
5154
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
5255
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -74,6 +77,8 @@
7477
import org.apache.pulsar.common.api.proto.ProtocolVersion;
7578
import org.apache.pulsar.common.naming.NamespaceBundle;
7679
import org.apache.pulsar.common.naming.TopicName;
80+
import org.apache.pulsar.common.util.netty.EventLoopUtil;
81+
import org.awaitility.Awaitility;
7782
import org.slf4j.Logger;
7883
import org.slf4j.LoggerFactory;
7984
import org.testng.Assert;
@@ -399,9 +404,54 @@ public void testAddRemoveConsumer() throws Exception {
399404
assertTrue(pdfc.canUnsubscribe(consumer1));
400405
}
401406

402-
@Test
407+
private String[] sortConsumerNameByHashSelector(String...consumerNames) throws Exception {
408+
String[] result = new String[consumerNames.length];
409+
PersistentTopic topic =
410+
new PersistentTopic(successTopicName, ledgerMock, pulsarTestContext.getBrokerService());
411+
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);
412+
int partitionIndex = -1;
413+
PersistentDispatcherSingleActiveConsumer dispatcher = new PersistentDispatcherSingleActiveConsumer(cursorMock,
414+
SubType.Failover, partitionIndex, topic, sub);
415+
for (String consumerName : consumerNames){
416+
Consumer consumer = spy(new Consumer(sub, SubType.Failover, topic.getName(), 999 /* consumer id */, 1,
417+
consumerName/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
418+
false /* read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH));
419+
dispatcher.addConsumer(consumer);
420+
}
421+
for (int i = 0; i < consumerNames.length; i++) {
422+
result[i] = dispatcher.getActiveConsumer().consumerName();
423+
dispatcher.removeConsumer(dispatcher.getActiveConsumer());
424+
}
425+
consumerChanges.clear();
426+
return result;
427+
}
428+
429+
private CommandActiveConsumerChange waitActiveChangeEvent(int consumerId)
430+
throws Exception {
431+
AtomicReference<CommandActiveConsumerChange> res = new AtomicReference<>();
432+
Awaitility.await().until(() -> {
433+
while (!consumerChanges.isEmpty()){
434+
CommandActiveConsumerChange change = consumerChanges.take();
435+
if (change.getConsumerId() == consumerId){
436+
res.set(change);
437+
return true;
438+
}
439+
}
440+
return false;
441+
});
442+
consumerChanges.clear();
443+
return res.get();
444+
}
445+
446+
@Test(invocationCount = 100)
403447
public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
404448
log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
449+
String[] sortedConsumerNameByHashSelector = sortConsumerNameByHashSelector("Cons1", "Cons2");
450+
BrokerService spyBrokerService = spy(pulsarTestContext.getBrokerService());
451+
final EventLoopGroup singleEventLoopGroup = EventLoopUtil.newEventLoopGroup(1,
452+
pulsarTestContext.getBrokerService().getPulsar().getConfig().isEnableBusyWait(),
453+
new DefaultThreadFactory("pulsar-io"));
454+
doAnswer(invocation -> singleEventLoopGroup).when(spyBrokerService).executor();
405455

406456
PersistentTopic topic =
407457
new PersistentTopic(successTopicName, ledgerMock, pulsarTestContext.getBrokerService());
@@ -417,23 +467,26 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
417467

418468
// 2. Add a consumer
419469
Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1,
420-
"Cons1"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
470+
sortedConsumerNameByHashSelector[0]/* consumer name */,
471+
true, serverCnx, "myrole-1", Collections.emptyMap(),
421472
false /* read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH));
422473
pdfc.addConsumer(consumer1);
423474
List<Consumer> consumers = pdfc.getConsumers();
424475
assertEquals(1, consumers.size());
425476
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
477+
waitActiveChangeEvent(1);
426478

427479
// 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order.
428480
Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1,
429-
"Cons2"/* consumer name */, true, serverCnx, "myrole-1", Collections.emptyMap(),
481+
sortedConsumerNameByHashSelector[1]/* consumer name */,
482+
true, serverCnx, "myrole-1", Collections.emptyMap(),
430483
false /* read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH));
431484
pdfc.addConsumer(consumer2);
432485

433486
// 4. Verify active consumer doesn't change
434487
consumers = pdfc.getConsumers();
435488
assertEquals(2, consumers.size());
436-
CommandActiveConsumerChange change = consumerChanges.take();
489+
CommandActiveConsumerChange change = waitActiveChangeEvent(2);
437490
verifyActiveConsumerChange(change, 2, false);
438491
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
439492
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
@@ -444,21 +497,10 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
444497
pdfc.addConsumer(consumer3);
445498
consumers = pdfc.getConsumers();
446499
assertEquals(3, consumers.size());
447-
change = consumerChanges.take();
448-
verifyActiveConsumerChange(change, 3, false);
449-
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
450-
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer1));
451-
452-
// 7. Remove first consumer and active consumer should change to consumer2 since it's added before consumer3
453-
// though consumer 3 has higher priority level
454-
pdfc.removeConsumer(consumer1);
455-
consumers = pdfc.getConsumers();
456-
assertEquals(2, consumers.size());
457-
change = consumerChanges.take();
458-
verifyActiveConsumerChange(change, 2, true);
459-
assertSame(pdfc.getActiveConsumer().consumerName(), consumer2.consumerName());
460-
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer2));
461-
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer2));
500+
change = waitActiveChangeEvent(3);
501+
verifyActiveConsumerChange(change, 3, true);
502+
assertSame(pdfc.getActiveConsumer().consumerName(), consumer3.consumerName());
503+
verify(consumer3, times(1)).notifyActiveConsumerChange(same(consumer3));
462504
}
463505

464506
@Test

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java

Lines changed: 70 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.LinkedBlockingQueue;
3333
import java.util.concurrent.TimeUnit;
34+
import lombok.AllArgsConstructor;
3435
import org.apache.pulsar.broker.BrokerTestUtil;
3536
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
3637
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -107,14 +108,17 @@ private void verifyConsumerActive(TestConsumerStateEventListener listener, int p
107108
Integer pid = listener.activeQueue.take();
108109
assertNotNull(pid);
109110
assertEquals(partitionId, pid.intValue());
110-
assertNull(listener.inActiveQueue.poll());
111111
}
112112

113113
private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception {
114114
Integer pid = listener.inActiveQueue.take();
115115
assertNotNull(pid);
116116
assertEquals(partitionId, pid.intValue());
117-
assertNull(listener.activeQueue.poll());
117+
}
118+
119+
private void clearEventQueue(TestConsumerStateEventListener listener) {
120+
listener.inActiveQueue.clear();
121+
listener.activeQueue.clear();
118122
}
119123

120124
private static class ActiveInactiveListenerEvent implements ConsumerEventListener {
@@ -135,27 +139,57 @@ public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
135139
}
136140
}
137141

142+
@AllArgsConstructor
143+
static class FailoverConsumer {
144+
private String consumerName;
145+
private Consumer<byte[]> consumer;
146+
private TestConsumerStateEventListener listener;
147+
private PersistentDispatcherSingleActiveConsumer dispatcher;
148+
private boolean isActiveConsumer(){
149+
return dispatcher.getActiveConsumer().consumerName().equals(consumerName);
150+
}
151+
}
152+
153+
FailoverConsumer createConsumer(String topicName, String subName, String listenerName, String consumerName)
154+
throws Exception {
155+
TestConsumerStateEventListener listener = new TestConsumerStateEventListener(listenerName);
156+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
157+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
158+
.subscriptionType(SubscriptionType.Failover)
159+
.consumerName(consumerName)
160+
.consumerEventListener(listener)
161+
.subscribe();
162+
PersistentDispatcherSingleActiveConsumer dispatcher =
163+
(PersistentDispatcherSingleActiveConsumer) pulsar.getBrokerService()
164+
.getTopic(topicName, false).get().get()
165+
.getSubscription(subName)
166+
.getDispatcher();
167+
return new FailoverConsumer(consumerName, consumer, listener, dispatcher);
168+
}
169+
138170
@Test
139171
public void testSimpleConsumerEventsWithoutPartition() throws Exception {
140172
final String topicName = "persistent://prop/use/ns-abc/failover-topic1-" + System.currentTimeMillis();
141173
final String subName = "sub1";
142174
final int numMsgs = 100;
143175

144-
TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener("listener-1");
145-
TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener("listener-2");
146-
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
147-
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);
148-
176+
// 1. Registry two consumers.
177+
FailoverConsumer failoverConsumer1 = createConsumer(topicName, subName, "l1", "c1");
178+
FailoverConsumer failoverConsumer2 = createConsumer(topicName, subName, "l2", "c2");
179+
FailoverConsumer firstConsumer;
180+
FailoverConsumer secondConsumer;
181+
if (failoverConsumer1.isActiveConsumer()){
182+
firstConsumer = failoverConsumer1;
183+
secondConsumer = failoverConsumer2;
184+
} else {
185+
firstConsumer = failoverConsumer2;
186+
secondConsumer = failoverConsumer1;
187+
}
149188

150-
// 1. two consumers on the same subscription
151-
ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1")
152-
.consumerEventListener(listener1);
153-
Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
154-
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
155-
.subscribe();
156-
verifyConsumerActive(listener1, -1);
157-
verifyConsumerInactive(listener2, -1);
158-
listener2.inActiveQueue.clear();
189+
verifyConsumerActive(firstConsumer.listener, -1);
190+
verifyConsumerInactive(secondConsumer.listener, -1);
191+
clearEventQueue(firstConsumer.listener);
192+
clearEventQueue(secondConsumer.listener);
159193

160194
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
161195
PersistentSubscription subRef = topicRef.getSubscription(subName);
@@ -185,14 +219,14 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
185219
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
186220
});
187221

188-
// 3. consumer1 should have all the messages while consumer2 should have no messages
222+
// 3. firstConsumer should have all the messages while secondConsumer should have no messages
189223
Message<byte[]> msg = null;
190-
Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
224+
Assert.assertNull(secondConsumer.consumer.receive(100, TimeUnit.MILLISECONDS));
191225
for (int i = 0; i < numMsgs; i++) {
192-
msg = consumer1.receive(1, TimeUnit.SECONDS);
226+
msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
193227
Assert.assertNotNull(msg);
194228
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
195-
consumer1.acknowledge(msg);
229+
firstConsumer.consumer.acknowledge(msg);
196230
}
197231

198232
rolloverPerIntervalStats();
@@ -211,51 +245,52 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
211245

212246
// 5. master consumer failure should resend unacked messages and new messages to another consumer
213247
for (int i = 0; i < 5; i++) {
214-
msg = consumer1.receive(1, TimeUnit.SECONDS);
248+
msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
215249
Assert.assertNotNull(msg);
216250
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
217-
consumer1.acknowledge(msg);
251+
firstConsumer.consumer.acknowledge(msg);
218252
}
219253
for (int i = 5; i < 10; i++) {
220-
msg = consumer1.receive(1, TimeUnit.SECONDS);
254+
msg = firstConsumer.consumer.receive(1, TimeUnit.SECONDS);
221255
Assert.assertNotNull(msg);
222256
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
223257
// do not ack
224258
}
225-
consumer1.close();
259+
firstConsumer.consumer.close();
226260

227261
Awaitility.await().untilAsserted(() -> {
228-
verifyConsumerActive(listener2, -1);
229-
verifyConsumerNotReceiveAnyStateChanges(listener1);
262+
verifyConsumerActive(secondConsumer.listener, -1);
263+
verifyConsumerNotReceiveAnyStateChanges(firstConsumer.listener);
264+
clearEventQueue(firstConsumer.listener);
265+
clearEventQueue(secondConsumer.listener);
230266
});
231267

232268
for (int i = 5; i < numMsgs; i++) {
233-
msg = consumer2.receive(1, TimeUnit.SECONDS);
269+
msg = secondConsumer.consumer.receive(1, TimeUnit.SECONDS);
234270
Assert.assertNotNull(msg);
235271
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
236-
consumer2.acknowledge(msg);
272+
secondConsumer.consumer.acknowledge(msg);
237273
}
238-
Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
274+
Assert.assertNull(secondConsumer.consumer.receive(100, TimeUnit.MILLISECONDS));
239275

240276
rolloverPerIntervalStats();
241277
Awaitility.await().untilAsserted(() -> {
242278
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
243-
244279
});
245280

246281
// 8. unsubscribe not allowed if multiple consumers connected
247282
try {
248-
consumer1.unsubscribe();
283+
firstConsumer.consumer.unsubscribe();
249284
fail("should fail");
250285
} catch (PulsarClientException e) {
251286
// ok
252287
}
253288

254-
// 9. unsubscribe allowed if there is a lone consumer
255-
consumer1.close();
289+
// 9. unsubscribe allowed if there is alone consumer
290+
firstConsumer.consumer.close();
256291
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
257292
try {
258-
consumer2.unsubscribe();
293+
secondConsumer.consumer.unsubscribe();
259294
} catch (PulsarClientException e) {
260295
fail("Should not fail", e);
261296
}
@@ -265,8 +300,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
265300
});
266301

267302
producer.close();
268-
consumer2.close();
269-
303+
secondConsumer.consumer.close();
270304
admin.topics().delete(topicName);
271305
}
272306

0 commit comments

Comments
 (0)