Skip to content

Commit 63d9eaf

Browse files
authored
[fix][broker] Fix incorrect number of read compacted entries (apache#20978)
1 parent 75d4d82 commit 63d9eaf

File tree

4 files changed

+51
-4
lines changed

4 files changed

+51
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
119119
return CompletableFuture.completedFuture(null);
120120
} else {
121121
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
122-
startPoint + numberOfEntriesToRead);
122+
startPoint + (numberOfEntriesToRead - 1));
123123
return readEntries(context.ledger, startPoint, endPoint)
124124
.thenAccept((entries) -> {
125125
Entry lastEntry = entries.get(entries.size() - 1);

pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull Position sta
7878
return CompletableFuture.completedFuture(Collections.emptyList());
7979
}
8080
long endPoint =
81-
Math.min(context.ledger.getLastAddConfirmed(), startPoint + numberOfEntriesToRead);
81+
Math.min(context.ledger.getLastAddConfirmed(), startPoint + (numberOfEntriesToRead - 1));
8282
return CompactedTopicImpl.readEntries(context.ledger, startPoint, endPoint);
8383
})).whenComplete((result, ex) -> {
8484
if (ex == null) {

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.pulsar.broker.service.Topic;
6262
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
6363
import org.apache.pulsar.broker.service.persistent.SystemTopic;
64+
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
6465
import org.apache.pulsar.client.api.CompressionType;
6566
import org.apache.pulsar.client.api.Consumer;
6667
import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -70,11 +71,14 @@
7071
import org.apache.pulsar.client.api.MessageRoutingMode;
7172
import org.apache.pulsar.client.api.Producer;
7273
import org.apache.pulsar.client.api.ProducerBuilder;
74+
import org.apache.pulsar.client.api.PulsarClient;
7375
import org.apache.pulsar.client.api.PulsarClientException;
7476
import org.apache.pulsar.client.api.Reader;
77+
import org.apache.pulsar.client.api.Schema;
7578
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
7679
import org.apache.pulsar.client.api.SubscriptionType;
7780
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
81+
import org.apache.pulsar.client.impl.ConsumerImpl;
7882
import org.apache.pulsar.client.impl.MessageIdImpl;
7983
import org.apache.pulsar.common.naming.NamespaceName;
8084
import org.apache.pulsar.common.naming.TopicName;
@@ -1783,4 +1787,38 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
17831787
Assert.assertNotEquals(ledgerId, -1L);
17841788
});
17851789
}
1790+
1791+
@Test(timeOut = 100000)
1792+
public void testReceiverQueueSize() throws Exception {
1793+
final String topicName = "persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID();
1794+
final String subName = "my-sub";
1795+
final int receiveQueueSize = 1;
1796+
@Cleanup
1797+
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
1798+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
1799+
.enableBatching(false).topic(topicName).create();
1800+
1801+
for (int i = 0; i < 10; i++) {
1802+
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
1803+
}
1804+
producer.flush();
1805+
1806+
admin.topics().triggerCompaction(topicName);
1807+
1808+
Awaitility.await().untilAsserted(() -> {
1809+
assertEquals(admin.topics().compactionStatus(topicName).status,
1810+
LongRunningProcessStatus.Status.SUCCESS);
1811+
});
1812+
1813+
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
1814+
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
1815+
.subscribe();
1816+
1817+
//Give some time to consume
1818+
Awaitility.await()
1819+
.untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
1820+
receiveQueueSize));
1821+
consumer.close();
1822+
producer.close();
1823+
}
17861824
}

pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,25 @@
3636
import org.apache.pulsar.client.impl.MessageImpl;
3737
import org.apache.pulsar.common.policies.data.ClusterData;
3838
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
39+
import org.testng.annotations.BeforeMethod;
3940
import org.testng.annotations.Test;
4041

4142
public class TopicCompactionServiceTest extends CompactorTest {
4243

43-
@Test
44-
public void test() throws PulsarClientException, PulsarAdminException {
44+
@BeforeMethod
45+
@Override
46+
public void setup() throws Exception {
47+
super.setup();
4548
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
4649
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
4750
String defaultTenant = "prop-xyz";
4851
admin.tenants().createTenant(defaultTenant, tenantInfo);
4952
String defaultNamespace = defaultTenant + "/ns1";
5053
admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
54+
}
5155

56+
@Test
57+
public void test() throws PulsarClientException, PulsarAdminException {
5258
String topic = "persistent://prop-xyz/ns1/my-topic";
5359

5460
PulsarTopicCompactionService service = new PulsarTopicCompactionService(topic, bk, () -> compactor);
@@ -114,5 +120,8 @@ public void test() throws PulsarClientException, PulsarAdminException {
114120
assertEquals(data, "B_3");
115121
}
116122
});
123+
124+
List<Entry> entries2 = service.readCompactedEntries(PositionImpl.EARLIEST, 1).join();
125+
assertEquals(entries2.size(), 1);
117126
}
118127
}

0 commit comments

Comments
 (0)