@@ -60,7 +60,7 @@ public class CompactedTopicImpl implements CompactedTopic {
6060
6161 private final BookKeeper bk ;
6262
63- private PositionImpl compactionHorizon = null ;
63+ private volatile PositionImpl compactionHorizon = null ;
6464 private volatile CompletableFuture <CompactedTopicContext > compactedTopicContext = null ;
6565
6666 public CompactedTopicImpl (BookKeeper bk ) {
@@ -91,7 +91,6 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
9191 int numberOfEntriesToRead ,
9292 boolean isFirstRead ,
9393 ReadEntriesCallback callback , Consumer consumer ) {
94- synchronized (this ) {
9594 PositionImpl cursorPosition ;
9695 if (isFirstRead && MessageId .earliest .equals (consumer .getStartMessageId ())){
9796 cursorPosition = PositionImpl .EARLIEST ;
@@ -101,32 +100,25 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
101100
102101 // TODO: redeliver epoch link https://github.com/apache/pulsar/issues/13690
103102 ReadEntriesCtx readEntriesCtx = ReadEntriesCtx .create (consumer , DEFAULT_CONSUMER_EPOCH );
104- if (compactionHorizon == null
105- || compactionHorizon .compareTo (cursorPosition ) < 0 ) {
103+
104+ final PositionImpl currentCompactionHorizon = compactionHorizon ;
105+
106+ if (currentCompactionHorizon == null
107+ || currentCompactionHorizon .compareTo (cursorPosition ) < 0 ) {
106108 cursor .asyncReadEntriesOrWait (numberOfEntriesToRead , callback , readEntriesCtx , PositionImpl .LATEST );
107109 } else {
108110 compactedTopicContext .thenCompose (
109111 (context ) -> findStartPoint (cursorPosition , context .ledger .getLastAddConfirmed (), context .cache )
110112 .thenCompose ((startPoint ) -> {
111113 // do not need to read the compaction ledger if it is empty.
112114 // the cursor just needs to be set to the compaction horizon
113- if (startPoint == COMPACT_LEDGER_EMPTY ) {
114- cursor .seek (compactionHorizon .getNext ());
115+ if (startPoint == COMPACT_LEDGER_EMPTY || startPoint == NEWER_THAN_COMPACTED ) {
116+ cursor .seek (currentCompactionHorizon .getNext ());
115117 callback .readEntriesComplete (Collections .emptyList (), readEntriesCtx );
116118 return CompletableFuture .completedFuture (null );
117- }
118- if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon .compareTo (cursorPosition ) < 0 ) {
119- cursor .asyncReadEntriesOrWait (numberOfEntriesToRead , callback , readEntriesCtx ,
120- PositionImpl .LATEST );
121- return CompletableFuture .completedFuture (null );
122119 } else {
123120 long endPoint = Math .min (context .ledger .getLastAddConfirmed (),
124121 startPoint + numberOfEntriesToRead );
125- if (startPoint == NEWER_THAN_COMPACTED ) {
126- cursor .seek (compactionHorizon .getNext ());
127- callback .readEntriesComplete (Collections .emptyList (), readEntriesCtx );
128- return CompletableFuture .completedFuture (null );
129- }
130122 return readEntries (context .ledger , startPoint , endPoint )
131123 .thenAccept ((entries ) -> {
132124 Entry lastEntry = entries .get (entries .size () - 1 );
@@ -142,15 +134,14 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
142134 }))
143135 .exceptionally ((exception ) -> {
144136 if (exception .getCause () instanceof NoSuchElementException ) {
145- cursor .seek (compactionHorizon .getNext ());
137+ cursor .seek (currentCompactionHorizon .getNext ());
146138 callback .readEntriesComplete (Collections .emptyList (), readEntriesCtx );
147139 } else {
148140 callback .readEntriesFailed (new ManagedLedgerException (exception ), readEntriesCtx );
149141 }
150142 return null ;
151143 });
152144 }
153- }
154145 }
155146
156147 static CompletableFuture <Long > findStartPoint (PositionImpl p ,
@@ -318,7 +309,7 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m)
318309 .compare (p .getEntryId (), m .getEntryId ()).result ();
319310 }
320311
321- public synchronized Optional <Position > getCompactionHorizon () {
312+ public Optional <Position > getCompactionHorizon () {
322313 return Optional .ofNullable (this .compactionHorizon );
323314 }
324315
0 commit comments