Skip to content

Commit 03b497b

Browse files
committed
[fix][broker] replication does not work due to the mixed and repetitive sending of user messages and replication markers (#24453)
(cherry picked from commit af27c43)
1 parent 6e3dd42 commit 03b497b

File tree

2 files changed

+128
-5
lines changed

2 files changed

+128
-5
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.Mockito.doAnswer;
2424
import static org.mockito.Mockito.spy;
2525
import static org.testng.Assert.assertEquals;
26+
import static org.testng.Assert.assertNotNull;
2627
import static org.testng.Assert.assertNull;
2728
import static org.testng.Assert.assertTrue;
2829
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -50,8 +51,10 @@
5051
import org.apache.pulsar.broker.BrokerTestUtil;
5152
import org.apache.pulsar.broker.PulsarService;
5253
import org.apache.pulsar.broker.ServiceConfiguration;
54+
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
5355
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
5456
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
57+
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
5558
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
5659
import org.apache.pulsar.client.api.Consumer;
5760
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -72,6 +75,7 @@
7275
import org.apache.pulsar.common.api.proto.BaseCommand;
7376
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
7477
import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
78+
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
7579
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
7680
import org.apache.pulsar.common.protocol.ByteBufPair;
7781
import org.apache.pulsar.common.protocol.Commands;
@@ -154,17 +158,117 @@ protected Runnable injectReplicatorClientCnx(
154158
final var replicationClients = brokerService.getReplicationClients();
155159
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
156160
PulsarClientImpl injectedClient = InjectedClientCnxClientBuilder.create(clientBuilder2, clientCnxFactory);
157-
assertTrue(replicationClients.remove(cluster2, internalClient));
161+
if (internalClient != null) {
162+
assertTrue(replicationClients.remove(cluster2, internalClient));
163+
}
158164
assertNull(replicationClients.putIfAbsent(cluster2, injectedClient));
159165

160166
// Return a cleanup injection task;
161167
return () -> {
162168
assertTrue(replicationClients.remove(cluster2, injectedClient));
163-
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
169+
if (internalClient != null) {
170+
assertNull(replicationClients.putIfAbsent(cluster2, internalClient));
171+
}
164172
injectedClient.closeAsync();
165173
};
166174
}
167175

176+
@Test
177+
public void testRepeatedlyPublishMixedMessageAndReplMarkers() throws Exception {
178+
// Inject a mechanism to drop all send receipt, to implement a scenario: the messages of the internal producer
179+
// of the replicator will be resent after a disconnection.
180+
AtomicBoolean stuckSendReceipt = new AtomicBoolean(true);
181+
Runnable cleanInjection = injectReplicatorClientCnx((conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
182+
@Override
183+
protected void handleSendReceipt(CommandSendReceipt sendReceipt) {
184+
if (stuckSendReceipt.get()) {
185+
// discard all send receipt, to make the producer republish.
186+
} else {
187+
super.handleSendReceipt(sendReceipt);
188+
}
189+
}
190+
});
191+
192+
// Create topics.
193+
// - Enable deduplication on the remote cluster.
194+
// - Enable replicated subscription.
195+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
196+
final String subscription = "s1";
197+
admin1.topics().createNonPartitionedTopic(topicName);
198+
Consumer<Integer> consumer1 = client1.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription)
199+
.replicateSubscriptionState(true).subscribe();
200+
admin2.topics().createSubscription(topicName, subscription, MessageId.earliest);
201+
consumer1.close();
202+
waitReplicatorStarted(topicName);
203+
PersistentTopic persistentTopic1 =
204+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
205+
PersistentTopic persistentTopic2 =
206+
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
207+
admin2.topicPolicies().setDeduplicationStatus(topicName, true);
208+
Awaitility.await().untilAsserted(() -> {
209+
MessageDeduplication messageDeduplication2 = persistentTopic2.getMessageDeduplication();
210+
assertEquals(String.valueOf(messageDeduplication2.getStatus()), "Enabled");
211+
});
212+
assertTrue(persistentTopic1.getSubscriptions().get(subscription).isReplicated());
213+
assertTrue(persistentTopic1.getReplicatedSubscriptionController().isPresent());
214+
ReplicatedSubscriptionsController replicatedSubscriptionsController =
215+
persistentTopic1.getReplicatedSubscriptionController().get();
216+
admin2.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
217+
218+
// Publish mixed user messages and replicated markers/
219+
// - Publish a user msg.
220+
// - Replicated marker will be sent internally per seconds
221+
Producer<Integer> producer = client1.newProducer(Schema.INT32).topic(topicName).create();
222+
int messageCount = 15;
223+
for (int i = 0; i < messageCount; i++) {
224+
producer.send(i);
225+
log.info("Sent message: {}", i);
226+
// The replicated marker will be sent internally per seconds. We wait 1.2s here, a replicated marker will
227+
// be sent.
228+
Thread.sleep(1200);
229+
log.info("latest replication snapshot: {}", replicatedSubscriptionsController.getLastCompletedSnapshotId());
230+
}
231+
producer.close();
232+
233+
//
234+
GeoPersistentReplicator replicator = (GeoPersistentReplicator) persistentTopic1.getReplicators().get(cluster2);
235+
long backlog = replicator.getCursor().getNumberOfEntriesInBacklog(true);
236+
Awaitility.await().untilAsserted(() -> {
237+
log.info("replication backlog: {}", backlog);
238+
assertTrue(backlog >= messageCount * 2);
239+
});
240+
241+
// 1. Remove the injection, which discarded send receipt of the internal producer of the replicator.
242+
// 2. Trigger a reconnection of the internal producer of the replicator.
243+
// The internal producer will retry to publish.
244+
stuckSendReceipt.set(false);
245+
replicator.producer.getClientCnx().ctx().channel().close();
246+
Producer<Integer> producer2 = client1.newProducer(Schema.INT32).topic(topicName).create();
247+
producer2.close();
248+
waitReplicatorStarted(topicName);
249+
250+
// Verify: all messages are sent, and no duplicated messages.
251+
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
252+
assertEquals(replicator.getCursor().getNumberOfEntriesInBacklog(true), 0);
253+
});
254+
Consumer<Integer> consumer2 = client2.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription)
255+
.replicateSubscriptionState(true).subscribe();
256+
for (int i = 0; i < messageCount; i++) {
257+
Message<Integer> msg = consumer2.receive(2, TimeUnit.SECONDS);
258+
assertNotNull(msg);
259+
log.info("Received message: {}", msg.getValue());
260+
assertEquals(msg.getValue(), i);
261+
}
262+
263+
// cleanup.
264+
cleanInjection.run();
265+
admin1.topics().deleteSubscription(topicName, subscription);
266+
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
267+
admin2.topics().setReplicationClusters(topicName, Arrays.asList(cluster2));
268+
admin1.topics().unload(topicName);
269+
admin2.topics().unload(topicName);
270+
}
271+
168272
@DataProvider(name = "deduplicationArgs")
169273
public Object[][] deduplicationArgs() {
170274
return new Object[][] {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.pulsar.client.api.Schema;
2929
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
3030
import org.apache.pulsar.common.api.proto.KeyValue;
31+
import org.apache.pulsar.common.api.proto.MarkerType;
3132
import org.apache.pulsar.common.naming.TopicName;
3233
import org.apache.pulsar.common.protocol.Markers;
3334

@@ -160,9 +161,27 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, OpSendMsg op, long sourceLI
160161
return;
161162
}
162163

163-
// Case-4: Unexpected
164-
// 4-1: got null source cluster's entry position, which is unexpected.
165-
// 4-2: unknown error, which is unexpected.
164+
// Case-4: Received an out-of-order msg send receipt, and the first item in pending queue is a marker.
165+
if (op.msg.getMessageBuilder().hasMarkerType()
166+
&& Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType())) {
167+
log.warn("[{}] [{}] Received an out-of-order msg send receipt because enabled replicated subscription,"
168+
+ " which is expected, it always happens when repeatedly publishing a pair of mixed"
169+
+ " replication marker messages and user messages."
170+
+ " source position {}:{}, pending send[marker type: {}]: {}:{}, latest persisted: {}:{}."
171+
+ " Drop the pending publish marker command because it is a marker and it almost no effect.",
172+
topic, producerName, sourceLId, sourceEId,
173+
MarkerType.valueOf(op.msg.getMessageBuilder().getMarkerType()), pendingLId, pendingEId,
174+
lastPersistedSourceLedgerId, lastPersistedSourceEntryId);
175+
// Drop pending marker. The next ack receipt of this marker message will be dropped after it come in.
176+
ackReceivedReplMarker(cnx, op, op.sequenceId, -1 /*non-batch message*/, -1, -1);
177+
// Handle the current send receipt.
178+
ackReceived(cnx, sourceLId, sourceEId, targetLId, targetEid);
179+
return;
180+
}
181+
182+
// Case-5: Unexpected
183+
// 5-1: got null source cluster's entry position, which is unexpected.
184+
// 5-2: unknown error, which is unexpected.
166185
log.error("[{}] [{}] Received an msg send receipt[error]: source entry {}:{}, target entry: {}:{},"
167186
+ " pending send: {}:{}, latest persisted: {}:{}, queue-size: {}",
168187
topic, producerName, sourceLId, sourceEId, targetLId, targetEid, pendingLId, pendingEId,

0 commit comments

Comments
 (0)