Skip to content

Commit 4c69584

Browse files
authored
[feat][broker][PIP-278] Support pluggable topic compaction service - part2 (apache#20718)
1 parent 07ec84e commit 4c69584

27 files changed

+434
-169
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,11 @@
147147
import org.apache.pulsar.common.util.Reflections;
148148
import org.apache.pulsar.common.util.ThreadDumpUtil;
149149
import org.apache.pulsar.common.util.netty.EventLoopUtil;
150+
import org.apache.pulsar.compaction.CompactionServiceFactory;
150151
import org.apache.pulsar.compaction.Compactor;
152+
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
151153
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
152-
import org.apache.pulsar.compaction.TwoPhaseCompactor;
154+
import org.apache.pulsar.compaction.TopicCompactionService;
153155
import org.apache.pulsar.functions.worker.ErrorNotifier;
154156
import org.apache.pulsar.functions.worker.WorkerConfig;
155157
import org.apache.pulsar.functions.worker.WorkerService;
@@ -198,7 +200,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
198200
private WebSocketService webSocketService = null;
199201
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
200202
private BookKeeperClientFactory bkClientFactory;
201-
private Compactor compactor;
203+
protected CompactionServiceFactory compactionServiceFactory;
202204
private StrategicTwoPhaseCompactor strategicCompactor;
203205
private ResourceUsageTransportManager resourceUsageTransportManager;
204206
private ResourceGroupService resourceGroupServiceManager;
@@ -452,6 +454,15 @@ public CompletableFuture<Void> closeAsync() {
452454

453455
resetMetricsServlet();
454456

457+
if (this.compactionServiceFactory != null) {
458+
try {
459+
this.compactionServiceFactory.close();
460+
} catch (Exception e) {
461+
LOG.warn("CompactionServiceFactory closing failed {}", e.getMessage());
462+
}
463+
this.compactionServiceFactory = null;
464+
}
465+
455466
if (this.webSocketService != null) {
456467
this.webSocketService.close();
457468
}
@@ -813,6 +824,9 @@ public void start() throws PulsarServerException {
813824
this.brokerServiceUrl = brokerUrl(config);
814825
this.brokerServiceUrlTls = brokerUrlTls(config);
815826

827+
if (this.compactionServiceFactory == null) {
828+
this.compactionServiceFactory = loadCompactionServiceFactory();
829+
}
816830

817831
if (null != this.webSocketService) {
818832
ClusterDataImpl clusterData = ClusterDataImpl.builder()
@@ -1475,25 +1489,16 @@ public synchronized ScheduledExecutorService getCompactorExecutor() {
14751489
return this.compactorExecutor;
14761490
}
14771491

1478-
// only public so mockito can mock it
1479-
public Compactor newCompactor() throws PulsarServerException {
1480-
return new TwoPhaseCompactor(this.getConfiguration(),
1481-
getClient(), getBookKeeperClient(),
1482-
getCompactorExecutor());
1483-
}
1484-
1485-
public synchronized Compactor getCompactor() throws PulsarServerException {
1486-
if (this.compactor == null) {
1487-
this.compactor = newCompactor();
1488-
}
1489-
return this.compactor;
1490-
}
1491-
14921492
// This method is used for metrics, which is allowed to as null
14931493
// Because it's no operation on the compactor, so let's remove the synchronized on this method
14941494
// to avoid unnecessary lock competition.
1495+
// Only the pulsar's compaction service provides the compaction stats. The compaction service plugin,
1496+
// it should be done by the plugin itself to expose the compaction metrics.
14951497
public Compactor getNullableCompactor() {
1496-
return this.compactor;
1498+
if (this.compactionServiceFactory instanceof PulsarCompactionServiceFactory pulsarCompactedServiceFactory) {
1499+
return pulsarCompactedServiceFactory.getNullableCompactor();
1500+
}
1501+
return null;
14971502
}
14981503

14991504
public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
@@ -1911,4 +1916,22 @@ public void shutdownNow() {
19111916
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
19121917
return new BrokerService(pulsar, ioEventLoopGroup);
19131918
}
1919+
1920+
private CompactionServiceFactory loadCompactionServiceFactory() {
1921+
String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName();
1922+
var compactionServiceFactory =
1923+
Reflections.createInstance(compactionServiceFactoryClassName, CompactionServiceFactory.class,
1924+
Thread.currentThread().getContextClassLoader());
1925+
compactionServiceFactory.initialize(this).join();
1926+
return compactionServiceFactory;
1927+
}
1928+
1929+
public CompletableFuture<TopicCompactionService> newTopicCompactionService(String topic) {
1930+
try {
1931+
CompactionServiceFactory compactionServiceFactory = this.getCompactionServiceFactory();
1932+
return compactionServiceFactory.newTopicCompactionService(topic);
1933+
} catch (Throwable e) {
1934+
return CompletableFuture.failedFuture(e);
1935+
}
1936+
}
19141937
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,64 +2077,73 @@ private void getLargestBatchIndexWhenPossible(
20772077

20782078
// If it's not pointing to a valid entry, respond messageId of the current position.
20792079
// If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
2080-
Optional<Position> compactionHorizon = persistentTopic.getCompactedTopic().getCompactionHorizon();
2081-
if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
2082-
&& lastPosition.compareTo((PositionImpl) compactionHorizon.get()) <= 0)) {
2083-
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
2084-
markDeletePosition);
2085-
return;
2086-
}
2080+
CompletableFuture<Position> compactionHorizonFuture =
2081+
persistentTopic.getTopicCompactionService().getLastCompactedPosition();
20872082

2088-
// For a valid position, we read the entry out and parse the batch size from its metadata.
2089-
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
2090-
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
2091-
@Override
2092-
public void readEntryComplete(Entry entry, Object ctx) {
2093-
entryFuture.complete(entry);
2083+
compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
2084+
if (ex != null) {
2085+
log.error("Failed to get compactionHorizon.", ex);
2086+
writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, ex.getMessage()));
2087+
return;
20942088
}
20952089

2096-
@Override
2097-
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
2098-
entryFuture.completeExceptionally(exception);
2090+
if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
2091+
&& lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) {
2092+
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
2093+
markDeletePosition);
2094+
return;
20992095
}
2100-
}, null);
21012096

2102-
CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
2103-
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
2104-
int batchSize = metadata.getNumMessagesInBatch();
2105-
entry.release();
2106-
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
2107-
});
2108-
2109-
batchSizeFuture.whenComplete((batchSize, e) -> {
2110-
if (e != null) {
2111-
if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
2112-
handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex,
2113-
markDeletePosition);
2114-
} else {
2115-
writeAndFlush(Commands.newError(
2116-
requestId, ServerError.MetadataError,
2117-
"Failed to get batch size for entry " + e.getMessage()));
2097+
// For a valid position, we read the entry out and parse the batch size from its metadata.
2098+
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
2099+
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
2100+
@Override
2101+
public void readEntryComplete(Entry entry, Object ctx) {
2102+
entryFuture.complete(entry);
21182103
}
2119-
} else {
2120-
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
21212104

2122-
if (log.isDebugEnabled()) {
2123-
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
2124-
topic.getName(), subscriptionName, lastPosition, partitionIndex);
2105+
@Override
2106+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
2107+
entryFuture.completeExceptionally(exception);
21252108
}
2109+
}, null);
21262110

2127-
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
2128-
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
2129-
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
2130-
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
2131-
}
2111+
CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
2112+
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
2113+
int batchSize = metadata.getNumMessagesInBatch();
2114+
entry.release();
2115+
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
2116+
});
2117+
2118+
batchSizeFuture.whenComplete((batchSize, e) -> {
2119+
if (e != null) {
2120+
if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
2121+
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
2122+
markDeletePosition);
2123+
} else {
2124+
writeAndFlush(Commands.newError(
2125+
requestId, ServerError.MetadataError,
2126+
"Failed to get batch size for entry " + e.getMessage()));
2127+
}
2128+
} else {
2129+
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
2130+
2131+
if (log.isDebugEnabled()) {
2132+
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
2133+
topic.getName(), subscriptionName, lastPosition, partitionIndex);
2134+
}
2135+
2136+
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
2137+
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
2138+
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
2139+
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
2140+
}
2141+
});
21322142
});
21332143
}
2134-
2135-
private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long requestId,
2136-
int partitionIndex, PositionImpl markDeletePosition) {
2137-
persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
2144+
private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId,
2145+
int partitionIndex, PositionImpl markDeletePosition) {
2146+
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
21382147
if (entry != null) {
21392148
try {
21402149
// in this case, all the data has been compacted, so return the last position

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@
5050
import org.apache.pulsar.broker.service.Subscription;
5151
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
5252
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
53+
import org.apache.pulsar.client.api.MessageId;
5354
import org.apache.pulsar.client.impl.Backoff;
5455
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5556
import org.apache.pulsar.common.util.Codec;
57+
import org.apache.pulsar.compaction.CompactedTopicUtils;
5658
import org.slf4j.Logger;
5759
import org.slf4j.LoggerFactory;
5860

@@ -347,8 +349,9 @@ protected void readMoreEntries(Consumer consumer) {
347349
}
348350
havePendingRead = true;
349351
if (consumer.readCompacted()) {
350-
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
351-
this, consumer);
352+
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
353+
CompactedTopicUtils.readCompactedEntries(topic.getTopicCompactionService(), cursor, messagesToRead,
354+
readFromEarliest, this, consumer);
352355
} else {
353356
ReadEntriesCtx readEntriesCtx =
354357
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -772,16 +772,26 @@ private void resetCursor(Position finalPosition, CompletableFuture<Void> future)
772772
log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
773773
topicName, subName);
774774

775-
try {
776-
boolean forceReset = false;
777-
if (topic.getCompactedTopic() != null && topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
778-
PositionImpl horizon = (PositionImpl) topic.getCompactedTopic().getCompactionHorizon().get();
775+
CompletableFuture<Boolean> forceReset = new CompletableFuture<>();
776+
if (topic.getTopicCompactionService() == null) {
777+
forceReset.complete(false);
778+
} else {
779+
topic.getTopicCompactionService().getLastCompactedPosition().thenAccept(lastCompactedPosition -> {
779780
PositionImpl resetTo = (PositionImpl) finalPosition;
780-
if (horizon.compareTo(resetTo) >= 0) {
781-
forceReset = true;
781+
if (lastCompactedPosition != null && resetTo.compareTo(lastCompactedPosition.getLedgerId(),
782+
lastCompactedPosition.getEntryId()) <= 0) {
783+
forceReset.complete(true);
784+
} else {
785+
forceReset.complete(false);
782786
}
783-
}
784-
cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() {
787+
}).exceptionally(ex -> {
788+
forceReset.completeExceptionally(ex);
789+
return null;
790+
});
791+
}
792+
793+
forceReset.thenAccept(forceResetValue -> {
794+
cursor.asyncResetCursor(finalPosition, forceResetValue, new AsyncCallbacks.ResetCursorCallback() {
785795
@Override
786796
public void resetComplete(Object ctx) {
787797
if (log.isDebugEnabled()) {
@@ -811,11 +821,12 @@ public void resetFailed(ManagedLedgerException exception, Object ctx) {
811821
}
812822
}
813823
});
814-
} catch (Exception e) {
824+
}).exceptionally((e) -> {
815825
log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
816826
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
817827
future.completeExceptionally(new BrokerServiceException(e));
818-
}
828+
return null;
829+
});
819830
});
820831
}
821832

0 commit comments

Comments
 (0)