Skip to content

[fix][client-tool]fix the topic offload policy param will be covered unexpected. #16357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,18 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo
} else {
policies.offload_threshold = offloadPolicies.getManagedLedgerOffloadThresholdInBytes();
}
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes(),
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES)) {
offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(
policies.offload_policies.getManagedLedgerOffloadMaxBlockSizeInBytes());
}

if (Objects.equals(offloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes(),
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES)) {
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(
policies.offload_policies.getManagedLedgerOffloadReadBufferSizeInBytes());
}

policies.offload_policies = offloadPolicies;
return policies;
}).thenApply(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -850,6 +851,29 @@ protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(bool
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
OffloadPoliciesImpl currentOffloadPolicies = topicPolicies.getOffloadPolicies();
if (currentOffloadPolicies != null && offloadPolicies != null){
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(),
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS)) {
offloadPolicies.setManagedLedgerOffloadDeletionLagInMillis(
currentOffloadPolicies.getManagedLedgerOffloadDeletionLagInMillis());
Comment on lines +856 to +859
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will change the behavior. After this change, users are not able to set the lagInMillis to null?

}
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(),
OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES)) {
offloadPolicies.setManagedLedgerOffloadThresholdInBytes(
currentOffloadPolicies.getManagedLedgerOffloadThresholdInBytes());
}
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes(),
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES)) {
offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(
currentOffloadPolicies.getManagedLedgerOffloadMaxBlockSizeInBytes());
}
if (Objects.equals(offloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes(),
OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES)) {
offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(
currentOffloadPolicies.getManagedLedgerOffloadReadBufferSizeInBytes());
}
}
topicPolicies.setOffloadPolicies(offloadPolicies);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,17 @@ public void topicPolicies() throws Exception {
.setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null,
8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST));

// test the set offload policies don't cover old value
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" +
" region -b bucket -t 10 -e endpoint -orp tiered-storage-first -g"));
verify(mockGlobalTopicsPolicies)
.setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null,
OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, 10L,
OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, OffloadedReadPriority.TIERED_STORAGE_FIRST));

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1681,20 +1681,20 @@ private class SetOffloadPolicies extends CliCommand {
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
description = "ManagedLedger offload max block Size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
private String maxBlockSizeInBytesStr;

@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
description = "ManagedLedger offload read buffer size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
private String readBufferSizeInBytesStr;

@Parameter(names = {"-t", "--offloadThresholdInBytes"}
, description = "ManagedLedger offload threshold in bytes", required = true)
private long offloadThresholdInBytes;
private String offloadThresholdInBytesStr;

@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
private String offloadDeletionLagInMillisStr;

@Parameter(
names = {"--offloadedReadPriority", "-orp"},
Expand All @@ -1711,13 +1711,27 @@ private class SetOffloadPolicies extends CliCommand {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;

public boolean positiveCheck(String paramName, long value) {
if (value <= 0) {
throw new ParameterException(paramName + " is not be negative or 0!");
}
return true;
}

public boolean maxValueCheck(String paramName, long value, long maxValue) {
if (value > maxValue) {
throw new ParameterException(paramName + " is not bigger than " + maxValue + "!");
}
return true;
}

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;

if (this.offloadReadPriorityStr != null) {
if (StringUtils.isNotBlank(offloadReadPriorityStr)) {
try {
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
} catch (Exception e) {
Expand All @@ -1729,6 +1743,48 @@ void run() throws PulsarAdminException {
}
}

int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
if (StringUtils.isNotBlank(maxBlockSizeInBytesStr)) {
long maxBlockSize = validateSizeString(maxBlockSizeInBytesStr);
if (positiveCheck("MaxBlockSizeInBytes", maxBlockSize)
&& maxValueCheck("MaxBlockSizeInBytes", maxBlockSize, Integer.MAX_VALUE)) {
maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue();
}
}

int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
if (StringUtils.isNotBlank(readBufferSizeInBytesStr)) {
long readBufferSize = validateSizeString(readBufferSizeInBytesStr);
if (positiveCheck("readBufferSizeInBytes", readBufferSize)
&& maxValueCheck("readBufferSizeInBytes", readBufferSize, Integer.MAX_VALUE)) {
readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue();
}
}

Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
if (StringUtils.isNotBlank(offloadThresholdInBytesStr)) {
long offloadThreshold = validateSizeString(offloadThresholdInBytesStr);
if (positiveCheck("offloadThresholdInBytes", offloadThreshold)
&& maxValueCheck("offloadThresholdInBytes", offloadThreshold, Long.MAX_VALUE)) {
offloadThresholdInBytes = offloadThreshold;
}
}

Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
if (StringUtils.isNotBlank(offloadDeletionLagInMillisStr)) {
Long offloadThreshold;
try {
offloadThreshold = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadDeletionLagInMillisStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (positiveCheck("offloadDeletionLagInMillis", offloadThreshold)
&& maxValueCheck("offloadDeletionLagInMillis", offloadThreshold, Long.MAX_VALUE)) {
offloadDeletionLagInMillis = offloadThreshold;
}
}

OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
s3Role, s3RoleSessionName,
awsId, awsSecret,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
Expand Down Expand Up @@ -2018,20 +2019,20 @@ private class SetOffloadPolicies extends CliCommand {
@Parameter(names = {"-m", "--maxBlockSizeInBytes"},
description = "ManagedLedger offload max block Size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int maxBlockSizeInBytes;
private String maxBlockSizeInBytesStr;

@Parameter(names = {"-rb", "--readBufferSizeInBytes"},
description = "ManagedLedger offload read buffer size in bytes,"
+ "s3 and google-cloud-storage requires this parameter")
private int readBufferSizeInBytes;
private String readBufferSizeInBytesStr;

@Parameter(names = {"-t", "--offloadThresholdInBytes"}
, description = "ManagedLedger offload threshold in bytes", required = true)
private long offloadThresholdInBytes;
private String offloadThresholdInBytesStr;

@Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
, description = "ManagedLedger offload deletion lag in bytes")
private Long offloadDeletionLagInMillis;
private String offloadDeletionLagInMillisStr;

@Parameter(names = {"--offloadedReadPriority", "-orp"},
description = "Read priority for offloaded messages. "
Expand All @@ -2043,13 +2044,27 @@ private class SetOffloadPolicies extends CliCommand {
)
private String offloadReadPriorityStr;

public boolean positiveCheck(String paramName, long value) {
if (value <= 0) {
throw new ParameterException(paramName + " is not be negative or 0!");
}
return true;
}

public boolean maxValueCheck(String paramName, long value, long maxValue) {
if (value > maxValue) {
throw new ParameterException(paramName + " is not bigger than " + maxValue + "!");
}
return true;
}

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);

OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;

if (this.offloadReadPriorityStr != null) {
if (StringUtils.isNotBlank(offloadReadPriorityStr)) {
try {
offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
} catch (Exception e) {
Expand All @@ -2061,6 +2076,48 @@ void run() throws PulsarAdminException {
}
}

int maxBlockSizeInBytes = OffloadPoliciesImpl.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
if (StringUtils.isNotBlank(maxBlockSizeInBytesStr)) {
long maxBlockSize = validateSizeString(maxBlockSizeInBytesStr);
if (positiveCheck("MaxBlockSizeInBytes", maxBlockSize)
&& maxValueCheck("MaxBlockSizeInBytes", maxBlockSize, Integer.MAX_VALUE)) {
maxBlockSizeInBytes = Long.valueOf(maxBlockSize).intValue();
}
}

int readBufferSizeInBytes = OffloadPoliciesImpl.DEFAULT_READ_BUFFER_SIZE_IN_BYTES;
if (StringUtils.isNotBlank(readBufferSizeInBytesStr)) {
long readBufferSize = validateSizeString(readBufferSizeInBytesStr);
if (positiveCheck("readBufferSizeInBytes", readBufferSize)
&& maxValueCheck("readBufferSizeInBytes", readBufferSize, Integer.MAX_VALUE)) {
readBufferSizeInBytes = Long.valueOf(readBufferSize).intValue();
}
}

Long offloadThresholdInBytes = OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES;
if (StringUtils.isNotBlank(offloadThresholdInBytesStr)) {
long offloadThreshold = validateSizeString(offloadThresholdInBytesStr);
if (positiveCheck("offloadThresholdInBytes", offloadThreshold)
&& maxValueCheck("offloadThresholdInBytes", offloadThreshold, Long.MAX_VALUE)) {
offloadThresholdInBytes = offloadThreshold;
}
}

Long offloadDeletionLagInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
if (StringUtils.isNotBlank(offloadDeletionLagInMillisStr)) {
Long offloadThreshold;
try {
offloadThreshold = TimeUnit.SECONDS.toMillis(
RelativeTimeUtil.parseRelativeTimeInSeconds(offloadDeletionLagInMillisStr));
} catch (IllegalArgumentException exception) {
throw new ParameterException(exception.getMessage());
}
if (positiveCheck("offloadDeletionLagInMillis", offloadThreshold)
&& maxValueCheck("offloadDeletionLagInMillis", offloadThreshold, Long.MAX_VALUE)) {
offloadDeletionLagInMillis = offloadThreshold;
}
}

OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
s3Role, s3RoleSessionName,
awsId, awsSecret,
Expand Down
29 changes: 29 additions & 0 deletions site2/docs/reference-pulsar-admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,35 @@ Options
|`-w`, `--wait-complete`|Wait for compaction to complete|false|


### `set-offload-policies`
Set the offload policy for a topic.

Usage

```bash

$ pulsar-admin topic set-offload-policies tenant/namespace/topic options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
$ pulsar-admin topic set-offload-policies tenant/namespace/topic options
$ pulsar-admin topics set-offload-policies tenant/namespace/topic options


```

Options

|Flag|Description|Default|
|----|---|---|
|`-d`, `--driver`|Driver to use to offload old data to long term storage,(Possible values: S3, aws-s3, google-cloud-storage)||
|`-r`, `--region`|The long term storage region||
|`-b`, `--bucket`|Bucket to place offloaded ledger into||
|`-e`, `--endpoint`|Alternative endpoint to connect to||
|`-i`, `--aws-id`|AWS Credential Id to use when using driver S3 or aws-s3||
|`-s`, `--aws-secret`|AWS Credential Secret to use when using driver S3 or aws-s3||
|`-ro`, `--s3-role`|S3 Role used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
|`-rsn`, `--s3-role-session-name`|S3 role session name used for STSAssumeRoleSessionCredentialsProvider using driver S3 or aws-s3||
|`-m`, `--maxBlockSizeInBytes`|Max block size|64MB|
|`-rb`, `--readBufferSizeInBytes`|Read buffer size|1MB|
|`-t`, `--offloadThresholdInBytes`|Offload after threshold size (eg: 1M, 5M)||
|`-dl`, `--offloadDeletionLagInMillis`|Offload after elapsed in millis (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w).||


### `create-partitioned-topic`
Create a partitioned topic. A partitioned topic must be created before producers can publish to it.

Expand Down
Loading