46
46
import java .util .function .Supplier ;
47
47
import java .util .stream .Collectors ;
48
48
import lombok .Getter ;
49
+ import org .apache .commons .lang3 .StringUtils ;
49
50
import org .apache .pulsar .client .admin .ListTopicsOptions ;
50
51
import org .apache .pulsar .client .admin .LongRunningProcessStatus ;
51
52
import org .apache .pulsar .client .admin .OffloadProcessStatus ;
@@ -2015,20 +2016,20 @@ private class SetOffloadPolicies extends CliCommand {
2015
2016
@ Parameter (names = {"-m" , "--maxBlockSizeInBytes" },
2016
2017
description = "ManagedLedger offload max block Size in bytes,"
2017
2018
+ "s3 and google-cloud-storage requires this parameter" )
2018
- private int maxBlockSizeInBytes ;
2019
+ private String maxBlockSizeInBytesStr ;
2019
2020
2020
2021
@ Parameter (names = {"-rb" , "--readBufferSizeInBytes" },
2021
2022
description = "ManagedLedger offload read buffer size in bytes,"
2022
2023
+ "s3 and google-cloud-storage requires this parameter" )
2023
- private int readBufferSizeInBytes ;
2024
+ private String readBufferSizeInBytesStr ;
2024
2025
2025
2026
@ Parameter (names = {"-t" , "--offloadThresholdInBytes" }
2026
2027
, description = "ManagedLedger offload threshold in bytes" , required = true )
2027
- private long offloadThresholdInBytes ;
2028
+ private String offloadThresholdInBytesStr ;
2028
2029
2029
2030
@ Parameter (names = {"-dl" , "--offloadDeletionLagInMillis" }
2030
2031
, description = "ManagedLedger offload deletion lag in bytes" )
2031
- private Long offloadDeletionLagInMillis ;
2032
+ private String offloadDeletionLagInMillisStr ;
2032
2033
2033
2034
@ Parameter (names = {"--offloadedReadPriority" , "-orp" },
2034
2035
description = "Read priority for offloaded messages. "
@@ -2040,13 +2041,27 @@ private class SetOffloadPolicies extends CliCommand {
2040
2041
)
2041
2042
private String offloadReadPriorityStr ;
2042
2043
2044
+ public boolean positiveCheck (String paramName , long value ) {
2045
+ if (value <= 0 ) {
2046
+ throw new ParameterException (paramName + " is not be negative or 0!" );
2047
+ }
2048
+ return true ;
2049
+ }
2050
+
2051
+ public boolean maxValueCheck (String paramName , long value , long maxValue ) {
2052
+ if (value > maxValue ) {
2053
+ throw new ParameterException (paramName + " is not bigger than " + maxValue + "!" );
2054
+ }
2055
+ return true ;
2056
+ }
2057
+
2043
2058
@ Override
2044
2059
void run () throws PulsarAdminException {
2045
2060
String persistentTopic = validatePersistentTopic (params );
2046
2061
2047
2062
OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl .DEFAULT_OFFLOADED_READ_PRIORITY ;
2048
2063
2049
- if (this . offloadReadPriorityStr != null ) {
2064
+ if (StringUtils . isNotBlank ( offloadReadPriorityStr ) ) {
2050
2065
try {
2051
2066
offloadedReadPriority = OffloadedReadPriority .fromString (this .offloadReadPriorityStr );
2052
2067
} catch (Exception e ) {
@@ -2058,6 +2073,48 @@ void run() throws PulsarAdminException {
2058
2073
}
2059
2074
}
2060
2075
2076
+ int maxBlockSizeInBytes = OffloadPoliciesImpl .DEFAULT_MAX_BLOCK_SIZE_IN_BYTES ;
2077
+ if (StringUtils .isNotBlank (maxBlockSizeInBytesStr )) {
2078
+ long maxBlockSize = validateSizeString (maxBlockSizeInBytesStr );
2079
+ if (positiveCheck ("MaxBlockSizeInBytes" , maxBlockSize )
2080
+ && maxValueCheck ("MaxBlockSizeInBytes" , maxBlockSize , Integer .MAX_VALUE )) {
2081
+ maxBlockSizeInBytes = Long .valueOf (maxBlockSize ).intValue ();
2082
+ }
2083
+ }
2084
+
2085
+ int readBufferSizeInBytes = OffloadPoliciesImpl .DEFAULT_READ_BUFFER_SIZE_IN_BYTES ;
2086
+ if (StringUtils .isNotBlank (readBufferSizeInBytesStr )) {
2087
+ long readBufferSize = validateSizeString (readBufferSizeInBytesStr );
2088
+ if (positiveCheck ("readBufferSizeInBytes" , readBufferSize )
2089
+ && maxValueCheck ("readBufferSizeInBytes" , readBufferSize , Integer .MAX_VALUE )) {
2090
+ readBufferSizeInBytes = Long .valueOf (readBufferSize ).intValue ();
2091
+ }
2092
+ }
2093
+
2094
+ Long offloadThresholdInBytes = OffloadPoliciesImpl .DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES ;
2095
+ if (StringUtils .isNotBlank (offloadThresholdInBytesStr )) {
2096
+ long offloadThreshold = validateSizeString (offloadThresholdInBytesStr );
2097
+ if (positiveCheck ("offloadThresholdInBytes" , offloadThreshold )
2098
+ && maxValueCheck ("offloadThresholdInBytes" , offloadThreshold , Long .MAX_VALUE )) {
2099
+ offloadThresholdInBytes = offloadThreshold ;
2100
+ }
2101
+ }
2102
+
2103
+ Long offloadDeletionLagInMillis = OffloadPoliciesImpl .DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS ;
2104
+ if (StringUtils .isNotBlank (offloadDeletionLagInMillisStr )) {
2105
+ Long offloadThreshold ;
2106
+ try {
2107
+ offloadThreshold = TimeUnit .SECONDS .toMillis (
2108
+ RelativeTimeUtil .parseRelativeTimeInSeconds (offloadDeletionLagInMillisStr ));
2109
+ } catch (IllegalArgumentException exception ) {
2110
+ throw new ParameterException (exception .getMessage ());
2111
+ }
2112
+ if (positiveCheck ("offloadDeletionLagInMillis" , offloadThreshold )
2113
+ && maxValueCheck ("offloadDeletionLagInMillis" , offloadThreshold , Long .MAX_VALUE )) {
2114
+ offloadDeletionLagInMillis = offloadThreshold ;
2115
+ }
2116
+ }
2117
+
2061
2118
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl .create (driver , region , bucket , endpoint ,
2062
2119
s3Role , s3RoleSessionName ,
2063
2120
awsId , awsSecret ,
0 commit comments