Skip to content

Commit 9be0b52

Browse files
authored
[fix][broker] In replication scenario, remote consumer could not be registered if there has no message was sent (apache#20888)
Motivation: In the replication scenario, we want to produce messages on the native cluster and consume messages on the remote cluster, the producer and consumer both use a same schema, but the consumer cannot be registered if there has no messages in the topic yet.The root cause is that for the remote cluster, there is a producer who has been registered with `AUTO_PRODUCE_BYTES` schema, so there is no schema to check the compatibility. Modifications: If there is no schema and only the replicator producer was registered, skip the compatibility check.
1 parent 5d72615 commit 9be0b52

File tree

4 files changed

+59
-2
lines changed

4 files changed

+59
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
139139

140140
private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER =
141141
AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
142-
private volatile int userCreatedProducerCount = 0;
142+
protected volatile int userCreatedProducerCount = 0;
143143

144144
protected volatile Optional<Long> topicEpoch = Optional.empty();
145145
private volatile boolean hasExclusiveProducer;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3272,7 +3272,7 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
32723272
.toList().size())
32733273
.sum();
32743274
if (hasSchema
3275-
|| (!producers.isEmpty())
3275+
|| (userCreatedProducerCount > 0)
32763276
|| (numActiveConsumersWithoutAutoSchema != 0)
32773277
|| (ledger.getTotalSize() != 0)) {
32783278
return checkSchemaCompatibleForConsumer(schema);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
24+
import java.util.Optional;
2325
import java.util.concurrent.TimeUnit;
2426
import org.apache.pulsar.broker.BrokerTestUtil;
27+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2528
import org.apache.pulsar.client.api.Consumer;
2629
import org.apache.pulsar.client.api.MessageId;
2730
import org.apache.pulsar.client.api.Producer;
31+
import org.apache.pulsar.client.api.Schema;
2832
import org.apache.pulsar.common.policies.data.TopicStats;
33+
import org.awaitility.Awaitility;
2934
import org.testng.annotations.AfterClass;
3035
import org.testng.annotations.BeforeClass;
3136
import org.testng.annotations.Test;
@@ -75,4 +80,29 @@ public void testReplicatorProducerStatInTopic() throws Exception {
7580
admin2.topics().delete(topicName);
7681
});
7782
}
83+
84+
@Test
85+
public void testCreateRemoteConsumerFirst() throws Exception {
86+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
87+
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
88+
// Wait for replicator started.
89+
Awaitility.await().untilAsserted(() -> {
90+
Optional<Topic> topicOptional2 = pulsar2.getBrokerService().getTopic(topicName, false).get();
91+
assertTrue(topicOptional2.isPresent());
92+
PersistentTopic persistentTopic2 = (PersistentTopic) topicOptional2.get();
93+
assertFalse(persistentTopic2.getProducers().isEmpty());
94+
});
95+
// The topic in cluster2 has a replicator created producer(schema Auto_Produce), but does not have any schema。
96+
// Verify: the consumer of this cluster2 can create successfully.
97+
Consumer<String> consumer2 = client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1")
98+
.subscribe();;
99+
100+
// cleanup.
101+
producer1.close();
102+
consumer2.close();
103+
cleanupTopics(() -> {
104+
admin1.topics().delete(topicName);
105+
admin2.topics().delete(topicName);
106+
});
107+
}
78108
}

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,33 @@ private void testIncompatibleSchema() throws Exception {
12901290
assertThrows(SchemaSerializationException.class, message2::getValue);
12911291
}
12921292

1293+
/**
1294+
* This test just ensure that schema check still keeps the original logic: if there has any producer, but no schema
1295+
* was registered, the new consumer could not register new schema.
1296+
* TODO: I think this design should be improved: if a producer used "AUTO_PRODUCE_BYTES" schema, we should allow
1297+
* the new consumer to register new schema. But before we can solve this problem, we need to modify
1298+
* "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES".
1299+
*/
1300+
@Test
1301+
public void testAutoProduceAndSpecifiedConsumer() throws Exception {
1302+
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
1303+
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
1304+
final String topicName = "persistent://" + namespace + "/tp_" + randomName(16);
1305+
admin.topics().createNonPartitionedTopic(topicName);
1306+
1307+
Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
1308+
try {
1309+
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
1310+
fail("Should throw ex: Topic does not have schema to check");
1311+
} catch (Exception ex){
1312+
assertTrue(ex.getMessage().contains("Topic does not have schema to check"));
1313+
}
1314+
1315+
// Cleanup.
1316+
producer.close();
1317+
admin.topics().delete(topicName);
1318+
}
1319+
12931320
@Test
12941321
public void testCreateSchemaInParallel() throws Exception {
12951322
final String namespace = "test-namespace-" + randomName(16);

0 commit comments

Comments
 (0)