Skip to content

Commit 3ab420c

Browse files
authored
[fix][broker] Fix message loss during topic compaction (apache#20980)
1 parent 63d9eaf commit 3ab420c

File tree

3 files changed

+75
-11
lines changed

3 files changed

+75
-11
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public class RawBatchConverter {
4444
public static boolean isReadableBatch(RawMessage msg) {
4545
ByteBuf payload = msg.getHeadersAndPayload();
4646
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
47+
return isReadableBatch(metadata);
48+
}
49+
50+
public static boolean isReadableBatch(MessageMetadata metadata) {
4751
return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0;
4852
}
4953

@@ -71,9 +75,9 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
7175
msg.getMessageIdData().getEntryId(),
7276
msg.getMessageIdData().getPartition(),
7377
i);
74-
if (!smm.isCompactedOut()) {
78+
if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
7579
idsAndKeysAndSize.add(ImmutableTriple.of(id,
76-
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
80+
smm.getPartitionKey(),
7781
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
7882
}
7983
singleMessagePayload.release();

pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,26 +122,32 @@ private void phaseOneLoop(RawReader reader,
122122
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
123123

124124
future.thenAcceptAsync(m -> {
125-
try {
125+
try (m) {
126126
MessageId id = m.getMessageId();
127127
boolean deletedMessage = false;
128128
boolean replaceMessage = false;
129129
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
130-
if (RawBatchConverter.isReadableBatch(m)) {
130+
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
131+
if (RawBatchConverter.isReadableBatch(metadata)) {
131132
try {
133+
int numMessagesInBatch = metadata.getNumMessagesInBatch();
134+
int deleteCnt = 0;
132135
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
133136
if (e != null) {
134137
if (e.getRight() > 0) {
135138
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
136-
replaceMessage = old != null;
139+
if (old != null) {
140+
mxBean.addCompactionRemovedEvent(reader.getTopic());
141+
}
137142
} else {
138-
deletedMessage = true;
139143
latestForKey.remove(e.getMiddle());
144+
deleteCnt++;
145+
mxBean.addCompactionRemovedEvent(reader.getTopic());
140146
}
141147
}
142-
if (replaceMessage || deletedMessage) {
143-
mxBean.addCompactionRemovedEvent(reader.getTopic());
144-
}
148+
}
149+
if (deleteCnt == numMessagesInBatch) {
150+
deletedMessage = true;
145151
}
146152
} catch (IOException ioe) {
147153
log.info("Error decoding batch for message {}. Whole batch will be included in output",
@@ -174,8 +180,6 @@ private void phaseOneLoop(RawReader reader,
174180
lastMessageId,
175181
latestForKey, loopPromise);
176182
}
177-
} finally {
178-
m.close();
179183
}
180184
}, scheduler).exceptionally(ex -> {
181185
loopPromise.completeExceptionally(ex);

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.TimeUnit;
4949
import lombok.Cleanup;
5050
import lombok.SneakyThrows;
51+
import lombok.extern.slf4j.Slf4j;
5152
import org.apache.bookkeeper.client.BookKeeper;
5253
import org.apache.bookkeeper.client.api.OpenBuilder;
5354
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -96,6 +97,7 @@
9697
import org.testng.annotations.Test;
9798

9899
@Test(groups = "broker-impl")
100+
@Slf4j
99101
public class CompactionTest extends MockedPulsarServiceBaseTest {
100102
protected ScheduledExecutorService compactionScheduler;
101103
protected BookKeeper bk;
@@ -553,6 +555,60 @@ public void testBatchMessageIdsDontChange() throws Exception {
553555
}
554556
}
555557

558+
@Test
559+
public void testBatchMessageWithNullValue() throws Exception {
560+
String topic = "persistent://my-property/use/my-ns/my-topic1";
561+
562+
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
563+
.receiverQueueSize(1).readCompacted(true).subscribe().close();
564+
565+
try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
566+
.maxPendingMessages(3)
567+
.enableBatching(true)
568+
.batchingMaxMessages(3)
569+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
570+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
571+
.create()
572+
) {
573+
// batch 1
574+
producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
575+
producer.newMessage().key("key1").value(null).sendAsync();
576+
producer.newMessage().key("key2").value("my-message-3".getBytes()).send();
577+
578+
// batch 2
579+
producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync();
580+
producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync();
581+
producer.newMessage().key("key3").value("my-message-6".getBytes()).send();
582+
583+
// batch 3
584+
producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync();
585+
producer.newMessage().key("key4").value(null).sendAsync();
586+
producer.newMessage().key("key5").value("my-message-9".getBytes()).send();
587+
}
588+
589+
590+
// compact the topic
591+
compact(topic);
592+
593+
// Read messages before compaction to get ids
594+
List<Message<byte[]>> messages = new ArrayList<>();
595+
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
596+
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) {
597+
while (true) {
598+
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
599+
if (message == null) {
600+
break;
601+
}
602+
messages.add(message);
603+
}
604+
}
605+
606+
assertEquals(messages.size(), 3);
607+
assertEquals(messages.get(0).getKey(), "key2");
608+
assertEquals(messages.get(1).getKey(), "key3");
609+
assertEquals(messages.get(2).getKey(), "key5");
610+
}
611+
556612
@Test
557613
public void testWholeBatchCompactedOut() throws Exception {
558614
String topic = "persistent://my-property/use/my-ns/my-topic1";

0 commit comments

Comments
 (0)