Skip to content

Commit 980dfc8

Browse files
authored
[improve][cli] Add some checks for topic-level setOffloadPolicies (apache#20943)
1 parent 499eef6 commit 980dfc8

File tree

8 files changed

+89
-51
lines changed

8 files changed

+89
-51
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.pulsar.common.policies.data.BundlesData;
5959
import org.apache.pulsar.common.policies.data.EntryFilters;
6060
import org.apache.pulsar.common.policies.data.NamespaceOperation;
61+
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
6162
import org.apache.pulsar.common.policies.data.PersistencePolicies;
6263
import org.apache.pulsar.common.policies.data.Policies;
6364
import org.apache.pulsar.common.policies.data.PolicyName;
@@ -852,4 +853,26 @@ protected List<String> filterSystemTopic(List<String> topics, boolean includeSys
852853
protected AuthorizationService getAuthorizationService() {
853854
return pulsar().getBrokerService().getAuthorizationService();
854855
}
856+
857+
protected void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
858+
if (offloadPolicies == null) {
859+
log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
860+
clientAppId(), namespaceName);
861+
throw new RestException(Status.PRECONDITION_FAILED,
862+
"The offloadPolicies must be specified for namespace offload.");
863+
}
864+
if (!offloadPolicies.driverSupported()) {
865+
log.warn("[{}] Failed to update offload configuration for namespace {}: "
866+
+ "driver is not supported, support value: {}",
867+
clientAppId(), namespaceName, OffloadPoliciesImpl.getSupportedDriverNames());
868+
throw new RestException(Status.PRECONDITION_FAILED,
869+
"The driver is not supported, support value: " + OffloadPoliciesImpl.getSupportedDriverNames());
870+
}
871+
if (!offloadPolicies.bucketValid()) {
872+
log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
873+
clientAppId(), namespaceName);
874+
throw new RestException(Status.PRECONDITION_FAILED,
875+
"The bucket must be specified for namespace offload.");
876+
}
877+
}
855878
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2316,28 +2316,6 @@ protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) {
23162316
}
23172317
}
23182318

2319-
private void validateOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
2320-
if (offloadPolicies == null) {
2321-
log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
2322-
clientAppId(), namespaceName);
2323-
throw new RestException(Status.PRECONDITION_FAILED,
2324-
"The offloadPolicies must be specified for namespace offload.");
2325-
}
2326-
if (!offloadPolicies.driverSupported()) {
2327-
log.warn("[{}] Failed to update offload configuration for namespace {}: "
2328-
+ "driver is not supported, support value: {}",
2329-
clientAppId(), namespaceName, OffloadPoliciesImpl.getSupportedDriverNames());
2330-
throw new RestException(Status.PRECONDITION_FAILED,
2331-
"The driver is not supported, support value: " + OffloadPoliciesImpl.getSupportedDriverNames());
2332-
}
2333-
if (!offloadPolicies.bucketValid()) {
2334-
log.warn("[{}] Failed to update offload configuration for namespace {}: bucket must be specified",
2335-
clientAppId(), namespaceName);
2336-
throw new RestException(Status.PRECONDITION_FAILED,
2337-
"The bucket must be specified for namespace offload.");
2338-
}
2339-
}
2340-
23412319
protected void internalRemoveMaxTopicsPerNamespace() {
23422320
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE);
23432321
internalSetMaxTopicsPerNamespace(null);

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -940,8 +940,8 @@ protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(bool
940940
});
941941
}
942942

943-
protected CompletableFuture<Void> internalSetOffloadPolicies
944-
(OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
943+
protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies,
944+
boolean isGlobal) {
945945
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
946946
.thenCompose(op -> {
947947
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
380380
@ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
381381
validateTopicName(tenant, namespace, encodedTopic);
382382
preValidation(authoritative)
383+
.thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
383384
.thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
384385
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
385386
.exceptionally(ex -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ public void testOffloadPoliciesApi() throws Exception {
213213
OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl) admin.topics().getOffloadPolicies(topicName);
214214
assertNull(offloadPolicies);
215215
OffloadPoliciesImpl offload = new OffloadPoliciesImpl();
216+
offload.setManagedLedgerOffloadDriver("S3");
217+
offload.setManagedLedgerOffloadBucket("bucket");
216218
String path = "fileSystemPath";
217219
offload.setFileSystemProfilePath(path);
218220
admin.topics().setOffloadPolicies(topicName, offload);
@@ -404,12 +406,13 @@ private void testOffload(boolean isPartitioned) throws Exception {
404406
//3 construct a topic level offloadPolicies
405407
OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
406408
offloadPolicies.setOffloadersDirectory(".");
407-
offloadPolicies.setManagedLedgerOffloadDriver("mock");
409+
offloadPolicies.setManagedLedgerOffloadDriver("S3");
410+
offloadPolicies.setManagedLedgerOffloadBucket("bucket");
408411
offloadPolicies.setManagedLedgerOffloadPrefetchRounds(10);
409412
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(1024L);
410413

411414
LedgerOffloader topicOffloader = mock(LedgerOffloader.class);
412-
when(topicOffloader.getOffloadDriverName()).thenReturn("mock");
415+
when(topicOffloader.getOffloadDriverName()).thenReturn("S3");
413416
doReturn(topicOffloader).when(pulsar).createManagedLedgerOffloader(any());
414417

415418
//4 set topic level offload policies
@@ -423,18 +426,18 @@ private void testOffload(boolean isPartitioned) throws Exception {
423426
.getTopic(TopicName.get(topicName).getPartition(i).toString(), false).get().get();
424427
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
425428
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
426-
, "mock");
429+
, "S3");
427430
}
428431
} else {
429432
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
430433
.getTopic(topicName, false).get().get();
431434
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
432435
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
433-
, "mock");
436+
, "S3");
434437
}
435438
//6 remove topic level offload policy, offloader should become namespaceOffloader
436439
LedgerOffloader namespaceOffloader = mock(LedgerOffloader.class);
437-
when(namespaceOffloader.getOffloadDriverName()).thenReturn("s3");
440+
when(namespaceOffloader.getOffloadDriverName()).thenReturn("S3");
438441
Map<NamespaceName, LedgerOffloader> map = new HashMap<>();
439442
map.put(TopicName.get(topicName).getNamespaceObject(), namespaceOffloader);
440443
doReturn(map).when(pulsar).getLedgerOffloaderMap();
@@ -450,14 +453,14 @@ private void testOffload(boolean isPartitioned) throws Exception {
450453
.getTopicIfExists(TopicName.get(topicName).getPartition(i).toString()).get().get();
451454
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
452455
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
453-
, "s3");
456+
, "S3");
454457
}
455458
} else {
456459
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
457460
.getTopic(topicName, false).get().get();
458461
assertNotNull(topic.getManagedLedger().getConfig().getLedgerOffloader());
459462
assertEquals(topic.getManagedLedger().getConfig().getLedgerOffloader().getOffloadDriverName()
460-
, "s3");
463+
, "S3");
461464
}
462465
}
463466

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pulsar.admin.cli;
2020

21+
import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
22+
import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
2123
import com.beust.jcommander.Parameter;
2224
import com.beust.jcommander.ParameterException;
2325
import com.beust.jcommander.Parameters;
@@ -2319,20 +2321,6 @@ public boolean isS3Driver(String driver) {
23192321
return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
23202322
}
23212323

2322-
public boolean positiveCheck(String paramName, long value) {
2323-
if (value <= 0) {
2324-
throw new ParameterException(paramName + " cannot be less than or equal to 0!");
2325-
}
2326-
return true;
2327-
}
2328-
2329-
public boolean maxValueCheck(String paramName, long value, long maxValue) {
2330-
if (value > maxValue) {
2331-
throw new ParameterException(paramName + " cannot be greater than " + maxValue + "!");
2332-
}
2333-
return true;
2334-
}
2335-
23362324
@Override
23372325
void run() throws PulsarAdminException {
23382326
String namespace = validateNamespace(params);

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
*/
1919
package org.apache.pulsar.admin.cli;
2020

21+
import static org.apache.pulsar.admin.cli.utils.CmdUtils.maxValueCheck;
22+
import static org.apache.pulsar.admin.cli.utils.CmdUtils.positiveCheck;
2123
import com.beust.jcommander.Parameter;
2224
import com.beust.jcommander.ParameterException;
2325
import com.beust.jcommander.Parameters;
26+
import com.google.common.base.Strings;
2427
import java.util.Arrays;
2528
import java.util.HashSet;
2629
import java.util.List;
@@ -1764,24 +1767,24 @@ private class SetOffloadPolicies extends CliCommand {
17641767
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
17651768
description = "ManagedLedger offload max block Size in bytes,"
17661769
+ "s3 and google-cloud-storage requires this parameter")
1767-
private int maxBlockSizeInBytes;
1770+
private int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
17681771

17691772
@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
17701773
description = "ManagedLedger offload read buffer size in bytes,"
17711774
+ "s3 and google-cloud-storage requires this parameter")
1772-
private int readBufferSizeInBytes;
1775+
private int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
17731776

17741777
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
1775-
, description = "ManagedLedger offload threshold in bytes", required = true)
1776-
private long offloadThresholdInBytes;
1778+
, description = "ManagedLedger offload threshold in bytes")
1779+
private Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
17771780

17781781
@Parameter(names = {"-ts", "--offloadThresholdInSeconds"}
17791782
, description = "ManagedLedger offload threshold in seconds")
1780-
private Long offloadThresholdInSeconds;
1783+
private Long offloadThresholdInSeconds = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_SECONDS;
17811784

17821785
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
17831786
, description = "ManagedLedger offload deletion lag in bytes")
1784-
private Long offloadDeletionLagInMillis;
1787+
private Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
17851788

17861789
@Parameter(
17871790
names = {"--offloadedReadPriority", "-orp"},
@@ -1798,10 +1801,38 @@ private class SetOffloadPolicies extends CliCommand {
17981801
+ "If set to true, the policy will be replicate to other clusters asynchronously")
17991802
private boolean isGlobal = false;
18001803

1804+
public final List<String> driverNames = OffloadPoliciesImpl.DRIVER_NAMES;
1805+
1806+
public boolean driverSupported(String driver) {
1807+
return driverNames.stream().anyMatch(d -> d.equalsIgnoreCase(driver));
1808+
}
1809+
1810+
public boolean isS3Driver(String driver) {
1811+
if (StringUtils.isEmpty(driver)) {
1812+
return false;
1813+
}
1814+
return driver.equalsIgnoreCase(driverNames.get(0)) || driver.equalsIgnoreCase(driverNames.get(1));
1815+
}
1816+
18011817
@Override
18021818
void run() throws PulsarAdminException {
18031819
String persistentTopic = validatePersistentTopic(params);
18041820

1821+
if (!driverSupported(driver)) {
1822+
throw new ParameterException("The driver " + driver + " is not supported, "
1823+
+ "(Possible values: " + String.join(",", driverNames) + ").");
1824+
}
1825+
1826+
if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) {
1827+
throw new ParameterException(
1828+
"Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set"
1829+
+ " if s3 offload enabled");
1830+
}
1831+
positiveCheck("maxBlockSizeInBytes", maxBlockSizeInBytes);
1832+
maxValueCheck("maxBlockSizeInBytes", maxBlockSizeInBytes, Integer.MAX_VALUE);
1833+
positiveCheck("readBufferSizeInBytes", readBufferSizeInBytes);
1834+
maxValueCheck("readBufferSizeInBytes", readBufferSizeInBytes, Integer.MAX_VALUE);
1835+
18051836
OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
18061837

18071838
if (this.offloadReadPriorityStr != null) {

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/CmdUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,18 @@ public static <T> T loadConfig(String file, Class<T> clazz) throws IOException {
5656
}
5757
}
5858
}
59+
60+
public static boolean positiveCheck(String paramName, long value) {
61+
if (value <= 0) {
62+
throw new ParameterException(paramName + " cannot be less than or equal to 0!");
63+
}
64+
return true;
65+
}
66+
67+
public static boolean maxValueCheck(String paramName, long value, long maxValue) {
68+
if (value > maxValue) {
69+
throw new ParameterException(paramName + " cannot be greater than " + maxValue + "!");
70+
}
71+
return true;
72+
}
5973
}

0 commit comments

Comments
 (0)