2020
2121import com .google .protobuf .InvalidProtocolBufferException ;
2222import io .netty .buffer .ByteBuf ;
23+ import io .netty .buffer .Unpooled ;
2324import java .util .ArrayList ;
2425import java .util .Enumeration ;
2526import java .util .List ;
3839import org .apache .pulsar .broker .ServiceConfiguration ;
3940import org .apache .pulsar .broker .delayed .proto .SnapshotMetadata ;
4041import org .apache .pulsar .broker .delayed .proto .SnapshotSegment ;
42+ import org .apache .pulsar .common .allocator .PulsarByteBufAllocator ;
4143import org .apache .pulsar .common .util .FutureUtil ;
4244
4345@ Slf4j
@@ -60,8 +62,9 @@ public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
6062 public CompletableFuture <Long > createBucketSnapshot (SnapshotMetadata snapshotMetadata ,
6163 List <SnapshotSegment > bucketSnapshotSegments ,
6264 String bucketKey , String topicName , String cursorName ) {
65+ ByteBuf metadataByteBuf = Unpooled .wrappedBuffer (snapshotMetadata .toByteArray ());
6366 return createLedger (bucketKey , topicName , cursorName )
64- .thenCompose (ledgerHandle -> addEntry (ledgerHandle , snapshotMetadata . toByteArray () )
67+ .thenCompose (ledgerHandle -> addEntry (ledgerHandle , metadataByteBuf )
6568 .thenCompose (__ -> addSnapshotSegments (ledgerHandle , bucketSnapshotSegments ))
6669 .thenCompose (__ -> closeLedger (ledgerHandle ))
6770 .thenApply (__ -> ledgerHandle .getId ()));
@@ -117,19 +120,32 @@ public void close() throws Exception {
117120 private CompletableFuture <Void > addSnapshotSegments (LedgerHandle ledgerHandle ,
118121 List <SnapshotSegment > bucketSnapshotSegments ) {
119122 List <CompletableFuture <Void >> addFutures = new ArrayList <>();
123+ ByteBuf byteBuf ;
120124 for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments ) {
121- addFutures .add (addEntry (ledgerHandle , bucketSnapshotSegment .toByteArray ()));
125+ byteBuf = PulsarByteBufAllocator .DEFAULT .directBuffer (bucketSnapshotSegment .getSerializedSize ());
126+ try {
127+ bucketSnapshotSegment .writeTo (byteBuf );
128+ } catch (Exception e ){
129+ byteBuf .release ();
130+ throw e ;
131+ }
132+ addFutures .add (addEntry (ledgerHandle , byteBuf ));
122133 }
123134
124135 return FutureUtil .waitForAll (addFutures );
125136 }
126137
127138 private SnapshotMetadata parseSnapshotMetadataEntry (LedgerEntry ledgerEntry ) {
139+ ByteBuf entryBuffer = null ;
128140 try {
129- ByteBuf entryBuffer = ledgerEntry .getEntryBuffer ();
141+ entryBuffer = ledgerEntry .getEntryBuffer ();
130142 return SnapshotMetadata .parseFrom (entryBuffer .nioBuffer ());
131143 } catch (InvalidProtocolBufferException e ) {
132144 throw new BucketSnapshotSerializationException (e );
145+ } finally {
146+ if (entryBuffer != null ) {
147+ entryBuffer .release ();
148+ }
133149 }
134150 }
135151
@@ -139,7 +155,11 @@ private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntr
139155 LedgerEntry ledgerEntry = entryEnumeration .nextElement ();
140156 SnapshotSegment snapshotSegment = new SnapshotSegment ();
141157 ByteBuf entryBuffer = ledgerEntry .getEntryBuffer ();
142- snapshotSegment .parseFrom (entryBuffer , entryBuffer .readableBytes ());
158+ try {
159+ snapshotSegment .parseFrom (entryBuffer , entryBuffer .readableBytes ());
160+ } finally {
161+ entryBuffer .release ();
162+ }
143163 snapshotMetadataList .add (snapshotSegment );
144164 }
145165 return snapshotMetadataList ;
@@ -208,7 +228,7 @@ private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
208228 return future ;
209229 }
210230
211- private CompletableFuture <Void > addEntry (LedgerHandle ledgerHandle , byte [] data ) {
231+ private CompletableFuture <Void > addEntry (LedgerHandle ledgerHandle , ByteBuf data ) {
212232 final CompletableFuture <Void > future = new CompletableFuture <>();
213233 ledgerHandle .asyncAddEntry (data ,
214234 (rc , handle , entryId , ctx ) -> {
0 commit comments