3232import io .netty .util .Timer ;
3333import java .time .Clock ;
3434import java .util .ArrayList ;
35+ import java .util .Collections ;
3536import java .util .HashMap ;
3637import java .util .Iterator ;
3738import java .util .List ;
4344import java .util .concurrent .ExecutionException ;
4445import java .util .concurrent .TimeUnit ;
4546import java .util .concurrent .TimeoutException ;
47+ import java .util .stream .Collectors ;
4648import javax .annotation .concurrent .ThreadSafe ;
4749import lombok .Getter ;
4850import lombok .extern .slf4j .Slf4j ;
@@ -93,6 +95,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
9395
9496 private static final Long INVALID_BUCKET_ID = -1L ;
9597
98+ private static final int MAX_MERGE_NUM = 4 ;
99+
96100 public BucketDelayedDeliveryTracker (PersistentDispatcherMultipleConsumers dispatcher ,
97101 Timer timer , long tickTimeMillis ,
98102 boolean isDelayedDeliveryDeliverAtTimeStrict ,
@@ -341,115 +345,136 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
341345 return true ;
342346 }
343347
344- private synchronized CompletableFuture < Void > asyncMergeBucketSnapshot ( ) {
345- List < ImmutableBucket > values = immutableBuckets . asMapOfRanges (). values (). stream (). toList ( );
348+ private synchronized List < ImmutableBucket > selectMergedBuckets ( final List < ImmutableBucket > values , int mergeNum ) {
349+ checkArgument ( mergeNum < values . size () );
346350 long minNumberMessages = Long .MAX_VALUE ;
347351 long minScheduleTimestamp = Long .MAX_VALUE ;
348352 int minIndex = -1 ;
349- for (int i = 0 ; i + 1 < values .size (); i ++) {
350- ImmutableBucket bucketL = values .get (i );
351- ImmutableBucket bucketR = values .get (i + 1 );
352- // We should skip the bucket which last segment already been load to memory, avoid record replicated index.
353- if (bucketL .lastSegmentEntryId > bucketL .getCurrentSegmentEntryId ()
354- && bucketR .lastSegmentEntryId > bucketR .getCurrentSegmentEntryId ()
355- // Skip the bucket that is merging
356- && !bucketL .merging && !bucketR .merging ){
357- long scheduleTimestamp =
358- Math .min (bucketL .firstScheduleTimestamps .get (bucketL .currentSegmentEntryId + 1 ),
359- bucketR .firstScheduleTimestamps .get (bucketR .currentSegmentEntryId + 1 ));
360- long numberMessages = bucketL .numberBucketDelayedMessages + bucketR .numberBucketDelayedMessages ;
361- if (scheduleTimestamp <= minScheduleTimestamp ) {
362- minScheduleTimestamp = scheduleTimestamp ;
363- if (numberMessages < minNumberMessages ) {
364- minNumberMessages = numberMessages ;
353+ for (int i = 0 ; i + (mergeNum - 1 ) < values .size (); i ++) {
354+ List <ImmutableBucket > immutableBuckets = values .subList (i , i + mergeNum );
355+ if (immutableBuckets .stream ().allMatch (bucket -> {
356+ // We should skip the bucket which last segment already been load to memory,
357+ // avoid record replicated index.
358+ return bucket .lastSegmentEntryId > bucket .currentSegmentEntryId && !bucket .merging ;
359+ })) {
360+ long numberMessages = immutableBuckets .stream ()
361+ .mapToLong (bucket -> bucket .numberBucketDelayedMessages )
362+ .sum ();
363+ if (numberMessages <= minNumberMessages ) {
364+ minNumberMessages = numberMessages ;
365+ long scheduleTimestamp = immutableBuckets .stream ()
366+ .mapToLong (bucket -> bucket .firstScheduleTimestamps .get (bucket .currentSegmentEntryId + 1 ))
367+ .min ().getAsLong ();
368+ if (scheduleTimestamp < minScheduleTimestamp ) {
369+ minScheduleTimestamp = scheduleTimestamp ;
365370 minIndex = i ;
366371 }
367372 }
368373 }
369374 }
370375
371- if (minIndex == -1 ) {
372- log .warn ("[{}] Can't find able merged bucket" , dispatcher .getName ());
373- return CompletableFuture .completedFuture (null );
376+ if (minIndex >= 0 ) {
377+ return values .subList (minIndex , minIndex + MAX_MERGE_NUM );
378+ } else if (mergeNum > 2 ){
379+ return selectMergedBuckets (values , mergeNum - 1 );
380+ } else {
381+ return Collections .emptyList ();
374382 }
383+ }
375384
376- ImmutableBucket immutableBucketA = values .get (minIndex );
377- ImmutableBucket immutableBucketB = values .get (minIndex + 1 );
385+ private synchronized CompletableFuture <Void > asyncMergeBucketSnapshot () {
386+ List <ImmutableBucket > immutableBucketList = immutableBuckets .asMapOfRanges ().values ().stream ().toList ();
387+ List <ImmutableBucket > toBeMergeImmutableBuckets = selectMergedBuckets (immutableBucketList , MAX_MERGE_NUM );
378388
389+ if (toBeMergeImmutableBuckets .isEmpty ()) {
390+ log .warn ("[{}] Can't find able merged buckets" , dispatcher .getName ());
391+ return CompletableFuture .completedFuture (null );
392+ }
393+
394+ final String bucketsStr = toBeMergeImmutableBuckets .stream ().map (Bucket ::bucketKey ).collect (
395+ Collectors .joining ("," )).replaceAll (DELAYED_BUCKET_KEY_PREFIX + "_" , "" );
379396 if (log .isDebugEnabled ()) {
380- log .info ("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}" , dispatcher .getName (),
381- immutableBucketA .bucketKey (), immutableBucketB .bucketKey ());
397+ log .info ("[{}] Merging bucket snapshot, bucketKeys: {}" , dispatcher .getName (), bucketsStr );
382398 }
383399
384- immutableBucketA .merging = true ;
385- immutableBucketB .merging = true ;
386- return asyncMergeBucketSnapshot (immutableBucketA , immutableBucketB ).whenComplete ((__ , ex ) -> {
400+ for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets ) {
401+ immutableBucket .merging = true ;
402+ }
403+ return asyncMergeBucketSnapshot (toBeMergeImmutableBuckets ).whenComplete ((__ , ex ) -> {
387404 synchronized (this ) {
388- immutableBucketA .merging = false ;
389- immutableBucketB .merging = false ;
405+ for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets ) {
406+ immutableBucket .merging = false ;
407+ }
390408 }
391409 if (ex != null ) {
392- log .error ("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey : {}" ,
393- dispatcher .getName (), immutableBucketA . bucketKey (), immutableBucketB . bucketKey () , ex );
410+ log .error ("[{}] Failed to merge bucket snapshot, bucketKeys : {}" ,
411+ dispatcher .getName (), bucketsStr , ex );
394412 } else {
395- log .info ("[{}] Merge bucket snapshot finish, bucketAKey : {}, bucketBKey : {}" ,
396- dispatcher .getName (), immutableBucketA . bucketKey (), immutableBucketB . bucketKey ());
413+ log .info ("[{}] Merge bucket snapshot finish, bucketKeys : {}, bucketNum : {}" ,
414+ dispatcher .getName (), bucketsStr , immutableBuckets . asMapOfRanges (). size ());
397415 }
398416 });
399417 }
400418
401- private synchronized CompletableFuture <Void > asyncMergeBucketSnapshot (ImmutableBucket bucketA ,
402- ImmutableBucket bucketB ) {
403- CompletableFuture < Long > createAFuture = bucketA . getSnapshotCreateFuture ().orElse (NULL_LONG_PROMISE );
404- CompletableFuture < Long > createBFuture = bucketB . getSnapshotCreateFuture (). orElse ( NULL_LONG_PROMISE );
419+ private synchronized CompletableFuture <Void > asyncMergeBucketSnapshot (List < ImmutableBucket > buckets ) {
420+ List < CompletableFuture < Long >> createFutures =
421+ buckets . stream (). map ( bucket -> bucket . getSnapshotCreateFuture ().orElse (NULL_LONG_PROMISE ))
422+ . toList ( );
405423
406- return CompletableFuture . allOf ( createAFuture , createBFuture ).thenCompose (bucketId -> {
407- if (INVALID_BUCKET_ID . equals ( createAFuture . join ()) || INVALID_BUCKET_ID .equals (createBFuture .join ())) {
424+ return FutureUtil . waitForAll ( createFutures ).thenCompose (bucketId -> {
425+ if (createFutures . stream (). anyMatch ( future -> INVALID_BUCKET_ID .equals (future .join () ))) {
408426 return FutureUtil .failedFuture (new RuntimeException ("Can't merge buckets due to bucket create failed" ));
409427 }
410428
411- CompletableFuture <List <DelayedMessageIndexBucketSnapshotFormat .SnapshotSegment >> futureA =
412- bucketA .getRemainSnapshotSegment ();
413- CompletableFuture <List <DelayedMessageIndexBucketSnapshotFormat .SnapshotSegment >> futureB =
414- bucketB .getRemainSnapshotSegment ();
415- return futureA .thenCombine (futureB , CombinedSegmentDelayedIndexQueue ::wrap )
429+ List <CompletableFuture <List <DelayedMessageIndexBucketSnapshotFormat .SnapshotSegment >>> getRemainFutures =
430+ buckets .stream ().map (ImmutableBucket ::getRemainSnapshotSegment ).toList ();
431+
432+ return FutureUtil .waitForAll (getRemainFutures )
433+ .thenApply (__ -> {
434+ return CombinedSegmentDelayedIndexQueue .wrap (
435+ getRemainFutures .stream ().map (CompletableFuture ::join ).toList ());
436+ })
416437 .thenAccept (combinedDelayedIndexQueue -> {
417438 synchronized (BucketDelayedDeliveryTracker .this ) {
418439 Pair <ImmutableBucket , DelayedIndex > immutableBucketDelayedIndexPair =
419440 lastMutableBucket .createImmutableBucketAndAsyncPersistent (
420441 timeStepPerBucketSnapshotSegmentInMillis ,
421442 maxIndexesPerBucketSnapshotSegment ,
422- sharedBucketPriorityQueue , combinedDelayedIndexQueue , bucketA .startLedgerId ,
423- bucketB .endLedgerId );
443+ sharedBucketPriorityQueue , combinedDelayedIndexQueue ,
444+ buckets .get (0 ).startLedgerId ,
445+ buckets .get (buckets .size () - 1 ).endLedgerId );
424446
425447 // Merge bit map to new bucket
426- Map <Long , RoaringBitmap > delayedIndexBitMapA = bucketA .getDelayedIndexBitMap ();
427- Map <Long , RoaringBitmap > delayedIndexBitMapB = bucketB .getDelayedIndexBitMap ();
428- Map <Long , RoaringBitmap > delayedIndexBitMap = new HashMap <>(delayedIndexBitMapA );
429- delayedIndexBitMapB .forEach ((ledgerId , bitMapB ) -> {
430- delayedIndexBitMap .compute (ledgerId , (k , bitMapA ) -> {
431- if (bitMapA == null ) {
432- return bitMapB ;
433- }
434-
435- bitMapA .or (bitMapB );
436- return bitMapA ;
448+ Map <Long , RoaringBitmap > delayedIndexBitMap =
449+ new HashMap <>(buckets .get (0 ).getDelayedIndexBitMap ());
450+ for (int i = 1 ; i < buckets .size (); i ++) {
451+ buckets .get (i ).delayedIndexBitMap .forEach ((ledgerId , bitMapB ) -> {
452+ delayedIndexBitMap .compute (ledgerId , (k , bitMap ) -> {
453+ if (bitMap == null ) {
454+ return bitMapB ;
455+ }
456+
457+ bitMap .or (bitMapB );
458+ return bitMap ;
459+ });
437460 });
438- });
461+ }
439462 immutableBucketDelayedIndexPair .getLeft ().setDelayedIndexBitMap (delayedIndexBitMap );
440463
441464 afterCreateImmutableBucket (immutableBucketDelayedIndexPair );
442465
443466 immutableBucketDelayedIndexPair .getLeft ().getSnapshotCreateFuture ()
444467 .orElse (NULL_LONG_PROMISE ).thenCompose (___ -> {
445- CompletableFuture <Void > removeAFuture = bucketA .asyncDeleteBucketSnapshot ();
446- CompletableFuture <Void > removeBFuture = bucketB .asyncDeleteBucketSnapshot ();
447- return CompletableFuture .allOf (removeAFuture , removeBFuture );
468+ List <CompletableFuture <Void >> removeFutures =
469+ buckets .stream ().map (ImmutableBucket ::asyncDeleteBucketSnapshot )
470+ .toList ();
471+ return FutureUtil .waitForAll (removeFutures );
448472 });
449473
450- Map <Range <Long >, ImmutableBucket > immutableBucketMap = immutableBuckets .asMapOfRanges ();
451- immutableBucketMap .remove (Range .closed (bucketA .startLedgerId , bucketA .endLedgerId ));
452- immutableBucketMap .remove (Range .closed (bucketB .startLedgerId , bucketB .endLedgerId ));
474+ for (ImmutableBucket bucket : buckets ) {
475+ immutableBuckets .asMapOfRanges ()
476+ .remove (Range .closed (bucket .startLedgerId , bucket .endLedgerId ));
477+ }
453478 }
454479 });
455480 });
@@ -593,14 +618,12 @@ public boolean shouldPauseAllDeliveries() {
593618
594619 @ Override
595620 public synchronized CompletableFuture <Void > clear () {
596- return cleanImmutableBuckets (true ).thenRun (() -> {
597- synchronized (this ) {
598- sharedBucketPriorityQueue .clear ();
599- lastMutableBucket .clear ();
600- snapshotSegmentLastIndexTable .clear ();
601- numberDelayedMessages = 0 ;
602- }
603- });
621+ CompletableFuture <Void > future = cleanImmutableBuckets ();
622+ sharedBucketPriorityQueue .clear ();
623+ lastMutableBucket .clear ();
624+ snapshotSegmentLastIndexTable .clear ();
625+ numberDelayedMessages = 0 ;
626+ return future ;
604627 }
605628
606629 @ Override
@@ -609,30 +632,24 @@ public synchronized void close() {
609632 lastMutableBucket .close ();
610633 sharedBucketPriorityQueue .close ();
611634 try {
612- cleanImmutableBuckets (false ).get (AsyncOperationTimeoutSeconds , TimeUnit .SECONDS );
635+ List <CompletableFuture <Long >> completableFutures = immutableBuckets .asMapOfRanges ().values ().stream ()
636+ .map (bucket -> bucket .getSnapshotCreateFuture ().orElse (NULL_LONG_PROMISE )).toList ();
637+ FutureUtil .waitForAll (completableFutures ).get (AsyncOperationTimeoutSeconds , TimeUnit .SECONDS );
613638 } catch (Exception e ) {
614639 log .warn ("[{}] Failed wait to snapshot generate" , dispatcher .getName (), e );
615640 }
616641 }
617642
618- private CompletableFuture <Void > cleanImmutableBuckets (boolean delete ) {
619- if (immutableBuckets != null ) {
620- List <CompletableFuture <Void >> futures = new ArrayList <>();
621- Iterator <ImmutableBucket > iterator = immutableBuckets .asMapOfRanges ().values ().iterator ();
622- while (iterator .hasNext ()) {
623- ImmutableBucket bucket = iterator .next ();
624- if (delete ) {
625- futures .add (bucket .clear ());
626- } else {
627- bucket .getSnapshotCreateFuture ().ifPresent (future -> futures .add (future .thenApply (x -> null )));
628- }
629- numberDelayedMessages -= bucket .getNumberBucketDelayedMessages ();
630- iterator .remove ();
631- }
632- return FutureUtil .waitForAll (futures );
633- } else {
634- return CompletableFuture .completedFuture (null );
643+ private CompletableFuture <Void > cleanImmutableBuckets () {
644+ List <CompletableFuture <Void >> futures = new ArrayList <>();
645+ Iterator <ImmutableBucket > iterator = immutableBuckets .asMapOfRanges ().values ().iterator ();
646+ while (iterator .hasNext ()) {
647+ ImmutableBucket bucket = iterator .next ();
648+ futures .add (bucket .clear ());
649+ numberDelayedMessages -= bucket .getNumberBucketDelayedMessages ();
650+ iterator .remove ();
635651 }
652+ return FutureUtil .waitForAll (futures );
636653 }
637654
638655 private boolean removeIndexBit (long ledgerId , long entryId ) {
0 commit comments