Skip to content

Commit 8910eea

Browse files
author
nicklixinyang
committed
fix the topic offload policy param will be covered unexpected.
1 parent eddbdd8 commit 8910eea

File tree

6 files changed

+199
-10
lines changed

6 files changed

+199
-10
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

+12
Original file line numberDiff line numberDiff line change
@@ -2453,6 +2453,18 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo
24532453
} else {
24542454
policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
24552455
}
2456+
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes(),
2457+
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES)) {
2458+
offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(
2459+
policies.offload_policies.getManagedLedgerOffloadMaxBlockSizeInBytes());
2460+
}
2461+
2462+
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes(),
2463+
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES)) {
2464+
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(
2465+
policies.offload_policies.getManagedLedgerOffloadReadBufferSizeInBytes());
2466+
}
2467+
24562468
policies.offload_policies = offloadPolicies;
24572469
return policies;
24582470
}).thenApply(r -> {

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

+24
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.HashMap;
3434
import java.util.List;
3535
import java.util.Map;
36+
import java.util.Objects;
3637
import java.util.Optional;
3738
import java.util.Set;
3839
import java.util.concurrent.CompletableFuture;
@@ -850,6 +851,29 @@ protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(bool
850851
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
851852
.thenCompose(op -> {
852853
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
854+
OffloadPoliciesImpl currentOffloadPolicies = topicPolicies.getOffloadPolicies();
855+
if (currentOffloadPolicies != null && offloadPolicies != null){
856+
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
857+
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
858+
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(
859+
currentOffloadPolicies.getManagedLedgerOffloadDeletionLagInMillis());
860+
}
861+
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
862+
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) {
863+
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(
864+
currentOffloadPolicies.getManagedLedgerOffloadThresholdInBytes());
865+
}
866+
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes(),
867+
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES)) {
868+
offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(
869+
currentOffloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes());
870+
}
871+
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes(),
872+
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES)) {
873+
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(
874+
currentOffloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes());
875+
}
876+
}
853877
topicPolicies.setOffloadPolicies(offloadPolicies);
854878
topicPolicies.setIsGlobal(isGlobal);
855879
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);

Diff for: pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -1340,6 +1340,17 @@ public void topicPolicies() throws Exception {
13401340
.setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
13411341
OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null,
13421342
8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST));
1343+
1344+
// test the set offload policies don't cover old value
1345+
cmdTopics = new CmdTopicPolicies(() -> admin);
1346+
cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" +
1347+
" region -b bucket -t 10 -e endpoint -orp tiered-storage-first -g"));
1348+
verify(mockGlobalTopicsPolicies)
1349+
.setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
1350+
OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null,
1351+
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, 10L,
1352+
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadedReadPriority.TIERED_STORAGE_FIRST));
1353+
13431354
}
13441355

13451356
@Test

Diff for: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java

+61-5
Original file line numberDiff line numberDiff line change
@@ -1681,20 +1681,20 @@ private class SetOffloadPolicies extends CliCommand {
16811681
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
16821682
description = "ManagedLedger offload max block Size in bytes,"
16831683
+ "s3 and google-cloud-storage requires this parameter")
1684-
private int maxBlockSizeInBytes;
1684+
private String maxBlockSizeInBytesStr;
16851685

16861686
@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
16871687
description = "ManagedLedger offload read buffer size in bytes,"
16881688
+ "s3 and google-cloud-storage requires this parameter")
1689-
private int readBufferSizeInBytes;
1689+
private String readBufferSizeInBytesStr;
16901690

16911691
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
16921692
, description = "ManagedLedger offload threshold in bytes", required = true)
1693-
private long offloadThresholdInBytes;
1693+
private String offloadThresholdInBytesStr;
16941694

16951695
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
16961696
, description = "ManagedLedger offload deletion lag in bytes")
1697-
private Long offloadDeletionLagInMillis;
1697+
private String offloadDeletionLagInMillisStr;
16981698

16991699
@Parameter(
17001700
names = {"--offloadedReadPriority", "-orp"},
@@ -1711,13 +1711,27 @@ private class SetOffloadPolicies extends CliCommand {
17111711
+ "If set to true, the policy will be replicate to other clusters asynchronously")
17121712
private boolean isGlobal = false;
17131713

1714+
public boolean positiveCheck(String paramName, long value) {
1715+
if (value <= 0) {
1716+
throw new ParameterException(paramName + " is not be negative or 0!");
1717+
}
1718+
return true;
1719+
}
1720+
1721+
public boolean maxValueCheck(String paramName, long value, long maxValue) {
1722+
if (value > maxValue) {
1723+
throw new ParameterException(paramName + " is not bigger than " + maxValue + "!");
1724+
}
1725+
return true;
1726+
}
1727+
17141728
@Override
17151729
void run() throws PulsarAdminException {
17161730
String persistentTopic = validatePersistentTopic(params);
17171731

17181732
OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
17191733

1720-
if (this.offloadReadPriorityStr != null) {
1734+
if (StringUtils.isNotBlank(offloadReadPriorityStr)) {
17211735
try {
17221736
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
17231737
} catch (Exception e) {
@@ -1729,6 +1743,48 @@ void run() throws PulsarAdminException {
17291743
}
17301744
}
17311745

1746+
int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
1747+
if (StringUtils.isNotBlank(maxBlockSizeInBytesStr)) {
1748+
long maxBlockSize = validateSizeString(maxBlockSizeInBytesStr);
1749+
if (positiveCheck("MaxBlockSizeInBytes", maxBlockSize)
1750+
&& maxValueCheck("MaxBlockSizeInBytes", maxBlockSize, Integer.MAX_VALUE)) {
1751+
maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue();
1752+
}
1753+
}
1754+
1755+
int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
1756+
if (StringUtils.isNotBlank(readBufferSizeInBytesStr)) {
1757+
long readBufferSize = validateSizeString(readBufferSizeInBytesStr);
1758+
if (positiveCheck("readBufferSizeInBytes", readBufferSize)
1759+
&& maxValueCheck("readBufferSizeInBytes", readBufferSize, Integer.MAX_VALUE)) {
1760+
readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue();
1761+
}
1762+
}
1763+
1764+
Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
1765+
if (StringUtils.isNotBlank(offloadThresholdInBytesStr)) {
1766+
long offloadThreshold = validateSizeString(offloadThresholdInBytesStr);
1767+
if (positiveCheck("offloadThresholdInBytes", offloadThreshold)
1768+
&& maxValueCheck("offloadThresholdInBytes", offloadThreshold, Long.MAX_VALUE)) {
1769+
offloadThresholdInBytes = offloadThreshold;
1770+
}
1771+
}
1772+
1773+
Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
1774+
if (StringUtils.isNotBlank(offloadDeletionLagInMillisStr)) {
1775+
Long offloadThreshold;
1776+
try {
1777+
offloadThreshold = TimeUnit.SECONDS.toMillis(
1778+
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadDeletionLagInMillisStr));
1779+
} catch (IllegalArgumentException exception) {
1780+
throw new ParameterException(exception.getMessage());
1781+
}
1782+
if (positiveCheck("offloadDeletionLagInMillis", offloadThreshold)
1783+
&& maxValueCheck("offloadDeletionLagInMillis", offloadThreshold, Long.MAX_VALUE)) {
1784+
offloadDeletionLagInMillis = offloadThreshold;
1785+
}
1786+
}
1787+
17321788
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
17331789
s3Role, s3RoleSessionName,
17341790
awsId, awsSecret,

Diff for: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

+62-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.function.Supplier;
4848
import java.util.stream.Collectors;
4949
import lombok.Getter;
50+
import org.apache.commons.lang3.StringUtils;
5051
import org.apache.pulsar.client.admin.ListTopicsOptions;
5152
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
5253
import org.apache.pulsar.client.admin.OffloadProcessStatus;
@@ -2018,20 +2019,20 @@ private class SetOffloadPolicies extends CliCommand {
20182019
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
20192020
description = "ManagedLedger offload max block Size in bytes,"
20202021
+ "s3 and google-cloud-storage requires this parameter")
2021-
private int maxBlockSizeInBytes;
2022+
private String maxBlockSizeInBytesStr;
20222023

20232024
@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
20242025
description = "ManagedLedger offload read buffer size in bytes,"
20252026
+ "s3 and google-cloud-storage requires this parameter")
2026-
private int readBufferSizeInBytes;
2027+
private String readBufferSizeInBytesStr;
20272028

20282029
@Parameter(names = {"-t", "--offloadThresholdInBytes"}
20292030
, description = "ManagedLedger offload threshold in bytes", required = true)
2030-
private long offloadThresholdInBytes;
2031+
private String offloadThresholdInBytesStr;
20312032

20322033
@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
20332034
, description = "ManagedLedger offload deletion lag in bytes")
2034-
private Long offloadDeletionLagInMillis;
2035+
private String offloadDeletionLagInMillisStr;
20352036

20362037
@Parameter(names = {"--offloadedReadPriority", "-orp"},
20372038
description = "Read priority for offloaded messages. "
@@ -2043,13 +2044,27 @@ private class SetOffloadPolicies extends CliCommand {
20432044
)
20442045
private String offloadReadPriorityStr;
20452046

2047+
public boolean positiveCheck(String paramName, long value) {
2048+
if (value <= 0) {
2049+
throw new ParameterException(paramName + " is not be negative or 0!");
2050+
}
2051+
return true;
2052+
}
2053+
2054+
public boolean maxValueCheck(String paramName, long value, long maxValue) {
2055+
if (value > maxValue) {
2056+
throw new ParameterException(paramName + " is not bigger than " + maxValue + "!");
2057+
}
2058+
return true;
2059+
}
2060+
20462061
@Override
20472062
void run() throws PulsarAdminException {
20482063
String persistentTopic = validatePersistentTopic(params);
20492064

20502065
OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
20512066

2052-
if (this.offloadReadPriorityStr != null) {
2067+
if (StringUtils.isNotBlank(offloadReadPriorityStr)) {
20532068
try {
20542069
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
20552070
} catch (Exception e) {
@@ -2061,6 +2076,48 @@ void run() throws PulsarAdminException {
20612076
}
20622077
}
20632078

2079+
int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
2080+
if (StringUtils.isNotBlank(maxBlockSizeInBytesStr)) {
2081+
long maxBlockSize = validateSizeString(maxBlockSizeInBytesStr);
2082+
if (positiveCheck("MaxBlockSizeInBytes", maxBlockSize)
2083+
&& maxValueCheck("MaxBlockSizeInBytes", maxBlockSize, Integer.MAX_VALUE)) {
2084+
maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue();
2085+
}
2086+
}
2087+
2088+
int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
2089+
if (StringUtils.isNotBlank(readBufferSizeInBytesStr)) {
2090+
long readBufferSize = validateSizeString(readBufferSizeInBytesStr);
2091+
if (positiveCheck("readBufferSizeInBytes", readBufferSize)
2092+
&& maxValueCheck("readBufferSizeInBytes", readBufferSize, Integer.MAX_VALUE)) {
2093+
readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue();
2094+
}
2095+
}
2096+
2097+
Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
2098+
if (StringUtils.isNotBlank(offloadThresholdInBytesStr)) {
2099+
long offloadThreshold = validateSizeString(offloadThresholdInBytesStr);
2100+
if (positiveCheck("offloadThresholdInBytes", offloadThreshold)
2101+
&& maxValueCheck("offloadThresholdInBytes", offloadThreshold, Long.MAX_VALUE)) {
2102+
offloadThresholdInBytes = offloadThreshold;
2103+
}
2104+
}
2105+
2106+
Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
2107+
if (StringUtils.isNotBlank(offloadDeletionLagInMillisStr)) {
2108+
Long offloadThreshold;
2109+
try {
2110+
offloadThreshold = TimeUnit.SECONDS.toMillis(
2111+
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadDeletionLagInMillisStr));
2112+
} catch (IllegalArgumentException exception) {
2113+
throw new ParameterException(exception.getMessage());
2114+
}
2115+
if (positiveCheck("offloadDeletionLagInMillis", offloadThreshold)
2116+
&& maxValueCheck("offloadDeletionLagInMillis", offloadThreshold, Long.MAX_VALUE)) {
2117+
offloadDeletionLagInMillis = offloadThreshold;
2118+
}
2119+
}
2120+
20642121
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
20652122
s3Role, s3RoleSessionName,
20662123
awsId, awsSecret,

Diff for: site2/docs/reference-pulsar-admin.md

+29
Original file line numberDiff line numberDiff line change
@@ -2470,6 +2470,35 @@ Options
24702470
|`-w`, `--wait-complete`|Wait for compaction to complete|false|
24712471

24722472

2473+
### `set-offload-policies`
2474+
Set the offload policy for a topic.
2475+
2476+
Usage
2477+
2478+
```bash
2479+
2480+
$ pulsar-admin topic set-offload-policies tenant/namespace/topic options
2481+
2482+
```
2483+
2484+
Options
2485+
2486+
|Flag|Description|Default|
2487+
|----|---|---|
2488+
|`-d`, `--driver`|Driver to use to offload old data to long term storage,(Possible values: S3, aws-s3, google-cloud-storage)||
2489+
|`-r`, `--region`|The long term storage region||
2490+
|`-b`, `--bucket`|Bucket to place offloaded ledger into||
2491+
|`-e`, `--endpoint`|Alternative endpoint to connect to||
2492+
|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
2493+
|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or aws-s3||
2494+
|`-ro`, `--s3-role`|S3 Role used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
2495+
|`-rsn`, `--s3-role-session-name`|S3 role session name used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
2496+
|`-m`, `--maxBlockSizeInBytes`|Max block size|64MB|
2497+
|`-rb`, `--readBufferSizeInBytes`|Read buffer size|1MB|
2498+
|`-t`, `--offloadThresholdInBytes`|Offload after threshold size (eg: 1M, 5M)||
2499+
|`-dl`, `--offloadDeletionLagInMillis`|Offload after elapsed in millis (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).||
2500+
2501+
24732502
### `create-partitioned-topic`
24742503
Create a partitioned topic. A partitioned topic must be created before producers can publish to it.
24752504

0 commit comments

Comments
 (0)