@@ -142,6 +142,7 @@ public Optional<String> serverConfigName(String configName) {
142142 public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false ;
143143 public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false ;
144144 public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false ;
145+ public static final long NO_RETENTION_LIMIT = -1L ; // It indicates no retention limit
145146 public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2 ; // It indicates the value to be derived from RetentionBytes
146147 public static final long DEFAULT_LOCAL_RETENTION_MS = -2 ; // It indicates the value to be derived from RetentionMs
147148 public static final long DEFAULT_REMOTE_COPY_LAG_MS = 0 ;
@@ -209,7 +210,7 @@ public Optional<String> serverConfigName(String configName) {
209210 // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
210211 .define (TopicConfig .RETENTION_BYTES_CONFIG , LONG , ServerLogConfigs .LOG_RETENTION_BYTES_DEFAULT , MEDIUM , TopicConfig .RETENTION_BYTES_DOC )
211212 // can be negative. See kafka.log.LogManager.cleanupExpiredSegments
212- .define (TopicConfig .RETENTION_MS_CONFIG , LONG , DEFAULT_RETENTION_MS , atLeast (- 1 ), MEDIUM ,
213+ .define (TopicConfig .RETENTION_MS_CONFIG , LONG , DEFAULT_RETENTION_MS , atLeast (NO_RETENTION_LIMIT ), MEDIUM ,
213214 TopicConfig .RETENTION_MS_DOC )
214215 .define (TopicConfig .MAX_MESSAGE_BYTES_CONFIG , INT , ServerLogConfigs .MAX_MESSAGE_BYTES_DEFAULT , atLeast (0 ), MEDIUM ,
215216 TopicConfig .MAX_MESSAGE_BYTES_DOC )
@@ -257,8 +258,8 @@ public Optional<String> serverConfigName(String configName) {
257258 .define (TopicConfig .LOCAL_LOG_RETENTION_BYTES_CONFIG , LONG , DEFAULT_LOCAL_RETENTION_BYTES , atLeast (-2 ), MEDIUM ,
258259 TopicConfig .LOCAL_LOG_RETENTION_BYTES_DOC )
259260 .define (TopicConfig .REMOTE_LOG_COPY_DISABLE_CONFIG , BOOLEAN , false , MEDIUM , TopicConfig .REMOTE_LOG_COPY_DISABLE_DOC )
260- .define (TopicConfig .REMOTE_COPY_LAG_MS_CONFIG , LONG , DEFAULT_REMOTE_COPY_LAG_MS , atLeast (- 1 ), MEDIUM , TopicConfig .REMOTE_COPY_LAG_MS_DOC )
261- .define (TopicConfig .REMOTE_COPY_LAG_BYTES_CONFIG , LONG , DEFAULT_REMOTE_COPY_LAG_BYTES , atLeast (- 1 ), MEDIUM , TopicConfig .REMOTE_COPY_LAG_BYTES_DOC )
261+ .define (TopicConfig .REMOTE_COPY_LAG_MS_CONFIG , LONG , DEFAULT_REMOTE_COPY_LAG_MS , atLeast (MAX_REMOTE_COPY_LAG_MS ), MEDIUM , TopicConfig .REMOTE_COPY_LAG_MS_DOC )
262+ .define (TopicConfig .REMOTE_COPY_LAG_BYTES_CONFIG , LONG , DEFAULT_REMOTE_COPY_LAG_BYTES , atLeast (MAX_REMOTE_COPY_LAG_BYTES ), MEDIUM , TopicConfig .REMOTE_COPY_LAG_BYTES_DOC )
262263 .define (TopicConfig .REMOTE_LOG_DELETE_ON_DISABLE_CONFIG , BOOLEAN , false , MEDIUM , TopicConfig .REMOTE_LOG_DELETE_ON_DISABLE_DOC )
263264 .define (TopicConfig .ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG , BOOLEAN , false , MEDIUM , TopicConfig .ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC )
264265 .defineInternal (INTERNAL_SEGMENT_BYTES_CONFIG , INT , null , null , MEDIUM , INTERNAL_SEGMENT_BYTES_DOC );
@@ -600,10 +601,10 @@ private static void validateRemoteStorageRequiresDeleteCleanupPolicy(Map<String,
600601 private static void validateRemoteStorageRetentionSize (Map <String , ?> props ) {
601602 Long retentionBytes = (Long ) props .get (TopicConfig .RETENTION_BYTES_CONFIG );
602603 Long localRetentionBytes = (Long ) props .get (TopicConfig .LOCAL_LOG_RETENTION_BYTES_CONFIG );
603- if (retentionBytes > - 1 && localRetentionBytes != - 2 ) {
604- if (localRetentionBytes == - 1 ) {
605- String message = String .format ("Value must not be -1 as %s value is set as %d." ,
606- TopicConfig .RETENTION_BYTES_CONFIG , retentionBytes );
604+ if (retentionBytes > NO_RETENTION_LIMIT && localRetentionBytes != DEFAULT_LOCAL_RETENTION_BYTES ) {
605+ if (localRetentionBytes == NO_RETENTION_LIMIT ) {
606+ String message = String .format ("Value must not be %d as %s value is set as %d." ,
607+ NO_RETENTION_LIMIT , TopicConfig .RETENTION_BYTES_CONFIG , retentionBytes );
607608 throw new ConfigException (TopicConfig .LOCAL_LOG_RETENTION_BYTES_CONFIG , localRetentionBytes , message );
608609 }
609610 if (localRetentionBytes > retentionBytes ) {
@@ -617,10 +618,10 @@ private static void validateRemoteStorageRetentionSize(Map<String, ?> props) {
617618 private static void validateRemoteStorageRetentionTime (Map <String , ?> props ) {
618619 Long retentionMs = (Long ) props .get (TopicConfig .RETENTION_MS_CONFIG );
619620 Long localRetentionMs = (Long ) props .get (TopicConfig .LOCAL_LOG_RETENTION_MS_CONFIG );
620- if (retentionMs != - 1 && localRetentionMs != - 2 ) {
621- if (localRetentionMs == - 1 ) {
622- String message = String .format ("Value must not be -1 as %s value is set as %d." ,
623- TopicConfig .RETENTION_MS_CONFIG , retentionMs );
621+ if (retentionMs != NO_RETENTION_LIMIT && localRetentionMs != DEFAULT_LOCAL_RETENTION_MS ) {
622+ if (localRetentionMs == NO_RETENTION_LIMIT ) {
623+ String message = String .format ("Value must not be %d as %s value is set as %d." ,
624+ NO_RETENTION_LIMIT , TopicConfig .RETENTION_MS_CONFIG , retentionMs );
624625 throw new ConfigException (TopicConfig .LOCAL_LOG_RETENTION_MS_CONFIG , localRetentionMs , message );
625626 }
626627 if (localRetentionMs > retentionMs ) {
@@ -635,7 +636,7 @@ private static void validateRemoteCopyLagTime(Map<?, ?> props) {
635636 Long retentionMs = (Long ) props .get (TopicConfig .RETENTION_MS_CONFIG );
636637 Long localRetentionMs = (Long ) props .get (TopicConfig .LOCAL_LOG_RETENTION_MS_CONFIG );
637638 Long remoteCopyLagMs = (Long ) props .get (TopicConfig .REMOTE_COPY_LAG_MS_CONFIG );
638- long effectiveLocalRetentionMs = localRetentionMs == - 2 ? retentionMs : localRetentionMs ;
639+ long effectiveLocalRetentionMs = localRetentionMs == DEFAULT_LOCAL_RETENTION_MS ? retentionMs : localRetentionMs ;
639640 if (remoteCopyLagMs > 0 && effectiveLocalRetentionMs >= 0
640641 && remoteCopyLagMs > effectiveLocalRetentionMs ) {
641642 String message = String .format ("Value must not exceed %s (effective value: %d)" ,
@@ -648,7 +649,7 @@ private static void validateRemoteCopyLagSize(Map<?, ?> props) {
648649 Long retentionBytes = (Long ) props .get (TopicConfig .RETENTION_BYTES_CONFIG );
649650 Long localRetentionBytes = (Long ) props .get (TopicConfig .LOCAL_LOG_RETENTION_BYTES_CONFIG );
650651 Long remoteCopyLagBytes = (Long ) props .get (TopicConfig .REMOTE_COPY_LAG_BYTES_CONFIG );
651- long effectiveLocalRetentionBytes = localRetentionBytes == - 2 ? retentionBytes : localRetentionBytes ;
652+ long effectiveLocalRetentionBytes = localRetentionBytes == DEFAULT_LOCAL_RETENTION_BYTES ? retentionBytes : localRetentionBytes ;
652653 if (remoteCopyLagBytes > 0 && effectiveLocalRetentionBytes >= 0
653654 && remoteCopyLagBytes > effectiveLocalRetentionBytes ) {
654655 String message = String .format ("Value must not exceed %s (effective value: %d)" ,
0 commit comments