diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5139bbd00d5c5..7dd64fd97eaa1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2453,6 +2453,18 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo } else { policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes(); } + if (Objects.equals(offloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes(), + OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES)) { + offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes( + policies.offload_policies.getManagedLedgerOffloadMaxBlockSizeInBytes()); + } + + if (Objects.equals(offloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes(), + OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES)) { + offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes( + policies.offload_policies.getManagedLedgerOffloadReadBufferSizeInBytes()); + } + policies.offload_policies = offloadPolicies; return policies; }).thenApply(r -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 6f48145b0ed36..5f4c153eebfbc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -850,6 +851,29 @@ protected CompletableFuture internalGetOffloadPolicies(bool return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + OffloadPoliciesImpl currentOffloadPolicies = topicPolicies.getOffloadPolicies(); + if (currentOffloadPolicies != null && offloadPolicies != null){ + if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), + OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) { + offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis( + currentOffloadPolicies.getManagedLedgerOffloadDeletionLagInMillis()); + } + if (Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), + OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) { + offloadPolicies.setManagedLedgerOffloadThresholdInBytes( + currentOffloadPolicies.getManagedLedgerOffloadThresholdInBytes()); + } + if (Objects.equals(offloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes(), + OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES)) { + offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes( + currentOffloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes()); + } + if (Objects.equals(offloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes(), + OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES)) { + offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes( + currentOffloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes()); + } + } topicPolicies.setOffloadPolicies(offloadPolicies); topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index e3e9157233c00..e094e32db78b6 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1340,6 +1340,17 @@ public void topicPolicies() throws Exception { .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, 8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + + // test the set offload policies don't cover old value + cmdTopics = new CmdTopicPolicies(() -> admin); + cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" + + " region -b bucket -t 10 -e endpoint -orp tiered-storage-first -g")); + verify(mockGlobalTopicsPolicies) + .setOffloadPolicies("persistent://myprop/clust/ns1/ds1", + OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null, + OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, 10L, + OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadedReadPriority.TIERED_STORAGE_FIRST)); + } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index b3bf27fe54adf..b292fa9024bc6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -1681,20 +1681,20 @@ private class SetOffloadPolicies extends CliCommand { @Parameter(names = {"-m", "--maxBlockSizeInBytes"}, description = "ManagedLedger offload max block Size in bytes," + "s3 and google-cloud-storage requires this parameter") - private int maxBlockSizeInBytes; + private String maxBlockSizeInBytesStr; @Parameter(names = {"-rb", "--readBufferSizeInBytes"}, description = "ManagedLedger offload read buffer size in bytes," + "s3 and google-cloud-storage requires this parameter") - private int readBufferSizeInBytes; + private String readBufferSizeInBytesStr; @Parameter(names = {"-t", "--offloadThresholdInBytes"} , description = "ManagedLedger offload threshold in bytes", required = true) - private long offloadThresholdInBytes; + private String offloadThresholdInBytesStr; @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") - private Long offloadDeletionLagInMillis; + private String offloadDeletionLagInMillisStr; @Parameter( names = {"--offloadedReadPriority", "-orp"}, @@ -1711,13 +1711,27 @@ private class SetOffloadPolicies extends CliCommand { + "If set to true, the policy will be replicate to other clusters asynchronously") private boolean isGlobal = false; + public boolean positiveCheck(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } + + public boolean maxValueCheck(String paramName, long value, long maxValue) { + if (value > maxValue) { + throw new ParameterException(paramName + " is not bigger than " + maxValue + "!"); + } + return true; + } + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY; - if (this.offloadReadPriorityStr != null) { + if (StringUtils.isNotBlank(offloadReadPriorityStr)) { try { offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr); } catch (Exception e) { @@ -1729,6 +1743,48 @@ void run() throws PulsarAdminException { } } + int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; + if (StringUtils.isNotBlank(maxBlockSizeInBytesStr)) { + long maxBlockSize = validateSizeString(maxBlockSizeInBytesStr); + if (positiveCheck("MaxBlockSizeInBytes", maxBlockSize) + && maxValueCheck("MaxBlockSizeInBytes", maxBlockSize, Integer.MAX_VALUE)) { + maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue(); + } + } + + int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES; + if (StringUtils.isNotBlank(readBufferSizeInBytesStr)) { + long readBufferSize = validateSizeString(readBufferSizeInBytesStr); + if (positiveCheck("readBufferSizeInBytes", readBufferSize) + && maxValueCheck("readBufferSizeInBytes", readBufferSize, Integer.MAX_VALUE)) { + readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue(); + } + } + + Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; + if (StringUtils.isNotBlank(offloadThresholdInBytesStr)) { + long offloadThreshold = validateSizeString(offloadThresholdInBytesStr); + if (positiveCheck("offloadThresholdInBytes", offloadThreshold) + && maxValueCheck("offloadThresholdInBytes", offloadThreshold, Long.MAX_VALUE)) { + offloadThresholdInBytes = offloadThreshold; + } + } + + Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; + if (StringUtils.isNotBlank(offloadDeletionLagInMillisStr)) { + Long offloadThreshold; + try { + offloadThreshold = TimeUnit.SECONDS.toMillis( + RelativeTimeUtil.parseRelativeTimeInSeconds(offloadDeletionLagInMillisStr)); + } catch (IllegalArgumentException exception) { + throw new ParameterException(exception.getMessage()); + } + if (positiveCheck("offloadDeletionLagInMillis", offloadThreshold) + && maxValueCheck("offloadDeletionLagInMillis", offloadThreshold, Long.MAX_VALUE)) { + offloadDeletionLagInMillis = offloadThreshold; + } + } + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint, s3Role, s3RoleSessionName, awsId, awsSecret, diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 4b73703395ed2..0c75d4ff05cf3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -47,6 +47,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; @@ -2018,20 +2019,20 @@ private class SetOffloadPolicies extends CliCommand { @Parameter(names = {"-m", "--maxBlockSizeInBytes"}, description = "ManagedLedger offload max block Size in bytes," + "s3 and google-cloud-storage requires this parameter") - private int maxBlockSizeInBytes; + private String maxBlockSizeInBytesStr; @Parameter(names = {"-rb", "--readBufferSizeInBytes"}, description = "ManagedLedger offload read buffer size in bytes," + "s3 and google-cloud-storage requires this parameter") - private int readBufferSizeInBytes; + private String readBufferSizeInBytesStr; @Parameter(names = {"-t", "--offloadThresholdInBytes"} , description = "ManagedLedger offload threshold in bytes", required = true) - private long offloadThresholdInBytes; + private String offloadThresholdInBytesStr; @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"} , description = "ManagedLedger offload deletion lag in bytes") - private Long offloadDeletionLagInMillis; + private String offloadDeletionLagInMillisStr; @Parameter(names = {"--offloadedReadPriority", "-orp"}, description = "Read priority for offloaded messages. " @@ -2043,13 +2044,27 @@ private class SetOffloadPolicies extends CliCommand { ) private String offloadReadPriorityStr; + public boolean positiveCheck(String paramName, long value) { + if (value <= 0) { + throw new ParameterException(paramName + " is not be negative or 0!"); + } + return true; + } + + public boolean maxValueCheck(String paramName, long value, long maxValue) { + if (value > maxValue) { + throw new ParameterException(paramName + " is not bigger than " + maxValue + "!"); + } + return true; + } + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY; - if (this.offloadReadPriorityStr != null) { + if (StringUtils.isNotBlank(offloadReadPriorityStr)) { try { offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr); } catch (Exception e) { @@ -2061,6 +2076,48 @@ void run() throws PulsarAdminException { } } + int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; + if (StringUtils.isNotBlank(maxBlockSizeInBytesStr)) { + long maxBlockSize = validateSizeString(maxBlockSizeInBytesStr); + if (positiveCheck("MaxBlockSizeInBytes", maxBlockSize) + && maxValueCheck("MaxBlockSizeInBytes", maxBlockSize, Integer.MAX_VALUE)) { + maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue(); + } + } + + int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES; + if (StringUtils.isNotBlank(readBufferSizeInBytesStr)) { + long readBufferSize = validateSizeString(readBufferSizeInBytesStr); + if (positiveCheck("readBufferSizeInBytes", readBufferSize) + && maxValueCheck("readBufferSizeInBytes", readBufferSize, Integer.MAX_VALUE)) { + readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue(); + } + } + + Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; + if (StringUtils.isNotBlank(offloadThresholdInBytesStr)) { + long offloadThreshold = validateSizeString(offloadThresholdInBytesStr); + if (positiveCheck("offloadThresholdInBytes", offloadThreshold) + && maxValueCheck("offloadThresholdInBytes", offloadThreshold, Long.MAX_VALUE)) { + offloadThresholdInBytes = offloadThreshold; + } + } + + Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; + if (StringUtils.isNotBlank(offloadDeletionLagInMillisStr)) { + Long offloadThreshold; + try { + offloadThreshold = TimeUnit.SECONDS.toMillis( + RelativeTimeUtil.parseRelativeTimeInSeconds(offloadDeletionLagInMillisStr)); + } catch (IllegalArgumentException exception) { + throw new ParameterException(exception.getMessage()); + } + if (positiveCheck("offloadDeletionLagInMillis", offloadThreshold) + && maxValueCheck("offloadDeletionLagInMillis", offloadThreshold, Long.MAX_VALUE)) { + offloadDeletionLagInMillis = offloadThreshold; + } + } + OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint, s3Role, s3RoleSessionName, awsId, awsSecret, diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index a6d0aaccea083..d527fde5b821d 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -2470,6 +2470,35 @@ Options |`-w`, `--wait-complete`|Wait for compaction to complete|false| +### `set-offload-policies` +Set the offload policy for a topic. + +Usage + +```bash + +$ pulsar-admin topic set-offload-policies tenant/namespace/topic options + +``` + +Options + +|Flag|Description|Default| +|----|---|---| +|`-d`, `--driver`|Driver to use to offload old data to long term storage,(Possible values: S3, aws-s3, google-cloud-storage)|| +|`-r`, `--region`|The long term storage region|| +|`-b`, `--bucket`|Bucket to place offloaded ledger into|| +|`-e`, `--endpoint`|Alternative endpoint to connect to|| +|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3|| +|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or aws-s3|| +|`-ro`, `--s3-role`|S3 Role used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3|| +|`-rsn`, `--s3-role-session-name`|S3 role session name used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3|| +|`-m`, `--maxBlockSizeInBytes`|Max block size|64MB| +|`-rb`, `--readBufferSizeInBytes`|Read buffer size|1MB| +|`-t`, `--offloadThresholdInBytes`|Offload after threshold size (eg: 1M, 5M)|| +|`-dl`, `--offloadDeletionLagInMillis`|Offload after elapsed in millis (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).|| + + ### `create-partitioned-topic` Create a partitioned topic. A partitioned topic must be created before producers can publish to it.