Skip to content

Commit 160a864

Browse files
authored
[improve][broker][PIP-195] Add topicName and cursorName for ledger metadata of bucket snapshot (apache#19802)
1 parent 0e96ded commit 160a864

File tree

8 files changed

+39
-19
lines changed

8 files changed

+39
-19
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public final class LedgerMetadataUtils {
4848
private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo";
4949
private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";
5050

51-
private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKETID = "pulsar/delayedIndexBucketId";
51+
private static final String METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY = "pulsar/delayedIndexBucketKey";
52+
private static final String METADATA_PROPERTY_DELAYED_INDEX_TOPIC = "pulsar/delayedIndexTopic";
53+
private static final String METADATA_PROPERTY_DELAYED_INDEX_CURSOR = "pulsar/delayedIndexCursor";
5254

5355
/**
5456
* Build base metadata for every ManagedLedger.
@@ -108,14 +110,19 @@ public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
108110
/**
109111
* Build additional metadata for a delayed message index bucket.
110112
*
111-
* @param bucketKey key of the delayed message bucket
113+
* @param bucketKey key of the delayed message bucket
114+
* @param topicName name of the topic
115+
* @param cursorName name of the cursor
112116
* @return an immutable map which describes the schema
113117
*/
114-
public static Map<String, byte[]> buildMetadataForDelayedIndexBucket(String bucketKey) {
118+
public static Map<String, byte[]> buildMetadataForDelayedIndexBucket(String bucketKey,
119+
String topicName, String cursorName) {
115120
return Map.of(
116121
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
117122
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET,
118-
METADATA_PROPERTY_DELAYED_INDEX_BUCKETID, bucketKey.getBytes(StandardCharsets.UTF_8)
123+
METADATA_PROPERTY_DELAYED_INDEX_BUCKET_KEY, bucketKey.getBytes(StandardCharsets.UTF_8),
124+
METADATA_PROPERTY_DELAYED_INDEX_TOPIC, topicName.getBytes(StandardCharsets.UTF_8),
125+
METADATA_PROPERTY_DELAYED_INDEX_CURSOR, cursorName.getBytes(StandardCharsets.UTF_8)
119126
);
120127
}
121128

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
5656
@Override
5757
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
5858
List<SnapshotSegment> bucketSnapshotSegments,
59-
String bucketKey) {
60-
return createLedger(bucketKey)
59+
String bucketKey, String topicName, String cursorName) {
60+
return createLedger(bucketKey, topicName, cursorName)
6161
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
6262
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
6363
.thenCompose(__ -> closeLedger(ledgerHandle))
@@ -143,9 +143,10 @@ private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntr
143143
}
144144

145145
@NotNull
146-
private CompletableFuture<LedgerHandle> createLedger(String bucketKey) {
146+
private CompletableFuture<LedgerHandle> createLedger(String bucketKey, String topicName, String cursorName) {
147147
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
148-
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey);
148+
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey,
149+
topicName, cursorName);
149150
bookKeeper.asyncCreateLedger(
150151
config.getManagedLedgerDefaultEnsembleSize(),
151152
config.getManagedLedgerDefaultWriteQuorum(),

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.bookkeeper.mledger.ManagedCursor;
3333
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3434
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
35+
import org.apache.pulsar.common.util.Codec;
3536
import org.roaringbitmap.RoaringBitmap;
3637

3738
@Slf4j
@@ -130,8 +131,11 @@ CompletableFuture<Long> asyncSaveBucketSnapshot(
130131
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
131132
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
132133
final String bucketKey = bucket.bucketKey();
134+
final String cursorName = Codec.decode(cursor.getName());
135+
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
133136
return executeWithRetry(
134-
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey)
137+
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey,
138+
topicName, cursorName)
135139
.whenComplete((__, ex) -> {
136140
if (ex != null) {
137141
log.warn("[{}] Failed to create bucket snapshot, bucketKey: {}",

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,16 @@ public interface BucketSnapshotStorage {
2828
/**
2929
* Create a delayed message index bucket snapshot with metadata and bucketSnapshotSegments.
3030
*
31-
* @param snapshotMetadata the metadata of snapshot
31+
* @param snapshotMetadata the metadata of snapshot
3232
* @param bucketSnapshotSegments the list of snapshot segments
33-
* @param bucketKey the key of bucket is used to generate custom storage metadata
33+
* @param bucketKey the key of bucket is used to generate custom storage metadata
34+
* @param topicName the name of topic is used to generate custom storage metadata
35+
* @param cursorName the name of cursor is used to generate custom storage metadata
3436
* @return the future with bucketId(ledgerId).
3537
*/
3638
CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
3739
List<SnapshotSegment> bucketSnapshotSegments,
38-
String bucketKey);
40+
String bucketKey, String topicName, String cursorName);
3941

4042
/**
4143
* Get delayed message index bucket snapshot metadata.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ public class BrokerService implements Closeable {
261261
private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
262262
private final ReadWriteLock lock = new ReentrantReadWriteLock();
263263

264+
@Getter
265+
@VisibleForTesting
264266
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
265267
private final ServerBootstrap defaultServerBootstrap;
266268
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>();

pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,17 @@ protected void cleanup() throws Exception {
5454
bucketSnapshotStorage.close();
5555
}
5656

57+
private static final String TOPIC_NAME = "topicName";
58+
private static final String CURSOR_NAME = "sub";
59+
5760
@Test
5861
public void testCreateSnapshot() throws ExecutionException, InterruptedException {
5962
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
6063
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
6164
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
6265
CompletableFuture<Long> future =
6366
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
64-
bucketSnapshotSegments, UUID.randomUUID().toString());
67+
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
6568
Long bucketId = future.get();
6669
Assert.assertNotNull(bucketId);
6770
}
@@ -90,7 +93,7 @@ public void testGetSnapshot() throws ExecutionException, InterruptedException {
9093

9194
CompletableFuture<Long> future =
9295
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
93-
bucketSnapshotSegments, UUID.randomUUID().toString());
96+
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
9497
Long bucketId = future.get();
9598
Assert.assertNotNull(bucketId);
9699

@@ -129,7 +132,7 @@ public void testGetSnapshotMetadata() throws ExecutionException, InterruptedExce
129132

130133
CompletableFuture<Long> future =
131134
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
132-
bucketSnapshotSegments, UUID.randomUUID().toString());
135+
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
133136
Long bucketId = future.get();
134137
Assert.assertNotNull(bucketId);
135138

@@ -151,7 +154,7 @@ public void testDeleteSnapshot() throws ExecutionException, InterruptedException
151154
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
152155
CompletableFuture<Long> future =
153156
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
154-
bucketSnapshotSegments, UUID.randomUUID().toString());
157+
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
155158
Long bucketId = future.get();
156159
Assert.assertNotNull(bucketId);
157160

@@ -189,7 +192,7 @@ public void testGetBucketSnapshotLength() throws ExecutionException, Interrupted
189192

190193
CompletableFuture<Long> future =
191194
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
192-
bucketSnapshotSegments, UUID.randomUUID().toString());
195+
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
193196
Long bucketId = future.get();
194197
Assert.assertNotNull(bucketId);
195198

pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public void injectDeleteException(Throwable throwable) {
8080

8181
@Override
8282
public CompletableFuture<Long> createBucketSnapshot(
83-
SnapshotMetadata snapshotMetadata, List<SnapshotSegment> bucketSnapshotSegments, String bucketKey) {
83+
SnapshotMetadata snapshotMetadata, List<SnapshotSegment> bucketSnapshotSegments, String bucketKey,
84+
String topicName, String cursorName) {
8485
Throwable throwable = createExceptionQueue.poll();
8586
if (throwable != null) {
8687
return FutureUtil.failedFuture(throwable);

pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Object[][] provider(Method method) throws Exception {
7979
bucketSnapshotStorage.start();
8080
ManagedCursor cursor = new MockManagedCursor("my_test_cursor");
8181
doReturn(cursor).when(dispatcher).getCursor();
82-
doReturn(cursor.getName()).when(dispatcher).getName();
82+
doReturn("persistent://public/default/testDelay" + " / " + cursor.getName()).when(dispatcher).getName();
8383

8484
final String methodName = method.getName();
8585
return switch (methodName) {

0 commit comments

Comments
 (0)