Skip to content

Commit b88c48a

Browse files
authored
[improve][broker] Copy BrokerEntryMetadata when rebatchMessage (apache#20337)
1 parent d565c95 commit b88c48a

File tree

2 files changed

+59
-2
lines changed

2 files changed

+59
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.pulsar.client.impl;
2020

2121
import static com.google.common.base.Preconditions.checkArgument;
22+
import static org.apache.pulsar.common.protocol.Commands.magicBrokerEntryMetadata;
2223
import io.netty.buffer.ByteBuf;
24+
import io.netty.buffer.CompositeByteBuf;
2325
import io.netty.buffer.Unpooled;
2426
import java.io.IOException;
2527
import java.util.ArrayList;
@@ -92,6 +94,14 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
9294
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
9395

9496
ByteBuf payload = msg.getHeadersAndPayload();
97+
int readerIndex = payload.readerIndex();
98+
ByteBuf brokerMeta = null;
99+
if (payload.getShort(readerIndex) == magicBrokerEntryMetadata) {
100+
payload.skipBytes(Short.BYTES);
101+
int brokerEntryMetadataSize = payload.readInt();
102+
payload.readerIndex(readerIndex);
103+
brokerMeta = payload.readRetainedSlice(brokerEntryMetadataSize + Short.BYTES + Integer.BYTES);
104+
}
95105
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
96106
ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());
97107

@@ -139,8 +149,15 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
139149

140150
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
141151
metadata, compressedPayload);
142-
Optional<RawMessage> result = Optional.of(new RawMessageImpl(msg.getMessageIdData(),
143-
metadataAndPayload));
152+
153+
if (brokerMeta != null) {
154+
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeDirectBuffer();
155+
compositeByteBuf.addComponents(true, brokerMeta, metadataAndPayload);
156+
metadataAndPayload = compositeByteBuf;
157+
}
158+
159+
Optional<RawMessage> result =
160+
Optional.of(new RawMessageImpl(msg.getMessageIdData(), metadataAndPayload));
144161
metadataAndPayload.release();
145162
compressedPayload.release();
146163
return result;

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.pulsar.client.api.Producer;
4040
import org.apache.pulsar.client.api.RawMessage;
4141
import org.apache.pulsar.client.api.RawReader;
42+
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
4243
import org.apache.pulsar.common.api.proto.MessageMetadata;
4344
import org.apache.pulsar.common.policies.data.ClusterData;
4445
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -57,6 +58,11 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
5758
@BeforeMethod
5859
@Override
5960
public void setup() throws Exception {
61+
conf.setBrokerEntryMetadataInterceptors(org.assertj.core.util.Sets.newTreeSet(
62+
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
63+
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
64+
));
65+
conf.setExposingBrokerEntryMetadataToClientEnabled(true);
6066
super.internalSetup();
6167

6268
admin.clusters().createCluster("test",
@@ -384,6 +390,40 @@ public void testBatchingRebatch() throws Exception {
384390
}
385391
}
386392

393+
@Test
394+
public void testBatchingRebatchWithBrokerEntryMetadata() throws Exception {
395+
String topic = "persistent://my-property/my-ns/my-raw-topic";
396+
397+
try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
398+
.maxPendingMessages(3)
399+
.enableBatching(true)
400+
.batchingMaxMessages(3)
401+
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
402+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
403+
.create()) {
404+
producer.newMessage().key("key1").value("my-content-1".getBytes()).sendAsync();
405+
producer.newMessage().key("key2").value("my-content-2".getBytes()).sendAsync();
406+
producer.newMessage().key("key3").value("my-content-3".getBytes()).send();
407+
}
408+
409+
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
410+
try (RawMessage m1 = reader.readNextAsync().get()) {
411+
RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
412+
BrokerEntryMetadata brokerEntryMetadata =
413+
Commands.parseBrokerEntryMetadataIfExist(m2.getHeadersAndPayload());
414+
Assert.assertNotNull(brokerEntryMetadata);
415+
Assert.assertEquals(brokerEntryMetadata.getIndex(), 2);
416+
Assert.assertTrue(brokerEntryMetadata.getBrokerTimestamp() < System.currentTimeMillis());
417+
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeys = RawBatchConverter.extractIdsAndKeysAndSize(m2);
418+
Assert.assertEquals(idsAndKeys.size(), 1);
419+
Assert.assertEquals(idsAndKeys.get(0).getMiddle(), "key2");
420+
m2.close();
421+
Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
422+
} finally {
423+
reader.closeAsync().get();
424+
}
425+
}
426+
387427
@Test
388428
public void testAcknowledgeWithProperties() throws Exception {
389429
int numKeys = 10;

0 commit comments

Comments
 (0)