Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
154 commits
Select commit Hold shift + click to select a range
cb6f22f
Reduce remote storage by checking local retention
jiafu1115 Nov 18, 2025
4f96275
correct the configure name to keep same style with existed configures…
jiafu1115 Nov 19, 2025
c410ae3
fix the unit test for null point protect
jiafu1115 Nov 19, 2025
b765f33
Merge branch 'apache:trunk' into storage
jiafu1115 Jan 5, 2026
c4a77b9
correct the desc according to kip's review
jiafu1115 Jan 7, 2026
1b0705e
Clarify remote log latest enable documentation
jiafu1115 Jan 7, 2026
2cb87c7
change the naming according to KIP review
jiafu1115 Jan 29, 2026
aefdac1
add broker level configure according to KIP review
jiafu1115 Jan 29, 2026
3c5131b
fix unit test
jiafu1115 Jan 29, 2026
44d74ac
improve the code
jiafu1115 Jan 29, 2026
791881e
remove duplicated code for check
jiafu1115 Jan 30, 2026
31ecadb
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
f2b2464
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 Feb 5, 2026
5b059e1
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
4c108d3
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
05392d5
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
60060e2
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
997ecbd
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
3398d49
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
2ae53dd
KAFKA-19893: change according to the KIP review
jiafu1115 Feb 5, 2026
7ef0a9e
KAFKA-19893: refactor the name and fix unit test
jiafu1115 Feb 6, 2026
b87f0af
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
f54c033
KAFKA-19893: add protected for the configures
jiafu1115 Feb 6, 2026
ef1ee28
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
1658a70
KAFKA-19893: correct the java doc
jiafu1115 Feb 6, 2026
d99d7de
fix compile issue
jiafu1115 Feb 6, 2026
a5b6733
simple the code
jiafu1115 Feb 6, 2026
8b35773
Revert "simple the code"
jiafu1115 Feb 6, 2026
9ea9ac2
Don't change other codes
jiafu1115 Feb 7, 2026
7eefc4c
Keep the same style
jiafu1115 Feb 7, 2026
f18b95a
Merge branch 'apache:trunk' into storage
jiafu1115 Feb 7, 2026
f0adf67
code refactor
jiafu1115 Feb 7, 2026
3d14856
Refactor remote copy lag validation methods
jiafu1115 Feb 7, 2026
b6c53b6
define the constants value
jiafu1115 Feb 8, 2026
e19efef
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 8, 2026
dea59a3
refactor
jiafu1115 Feb 8, 2026
b2d2079
Code format
jiafu1115 Feb 8, 2026
cf80f85
correct the total size calculate
jiafu1115 Feb 8, 2026
a747a1a
refactor
jiafu1115 Feb 8, 2026
a392516
Merge branch 'apache:trunk' into storage
jiafu1115 Feb 10, 2026
71cdaf9
Clarify segment upload eligibility in documentation
jiafu1115 Feb 10, 2026
b5ad798
Update documentation for REMOTE_COPY_LAG_MS_CONFIG
jiafu1115 Feb 10, 2026
eaa2e40
Update documentation for remote copy lag configurations
jiafu1115 Feb 10, 2026
4ffe31f
Improve the log
jiafu1115 Feb 13, 2026
7ee6b12
Update retention checks to allow zero values
jiafu1115 Feb 13, 2026
b3b1d13
Fix comparison for remote copy lag checks
jiafu1115 Feb 13, 2026
a6bc552
Fix conditions for copy lag checks in RemoteLogManager
jiafu1115 Feb 13, 2026
1761b0a
refactor document
jiafu1115 Feb 13, 2026
31478c5
refactor document
jiafu1115 Feb 13, 2026
d493ba0
refactor document
jiafu1115 Feb 13, 2026
f994e15
correct the document
jiafu1115 Feb 13, 2026
21200a0
correct the document
jiafu1115 Feb 13, 2026
7aa8e44
correct the document again
jiafu1115 Feb 13, 2026
32ca138
Fix documentation for remote copy lag configs
jiafu1115 Feb 13, 2026
ee9e6ca
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 13, 2026
95039d4
Fix typos in TopicConfig documentation
jiafu1115 Feb 13, 2026
fc71a83
Fix documentation for remote log copy lag properties
jiafu1115 Feb 13, 2026
017deec
improve the document and implement
jiafu1115 Feb 13, 2026
5b223b1
Update LogConfig.java
jiafu1115 Feb 13, 2026
f458edd
Update comments for MAX_REMOTE_COPY_LAG constants
jiafu1115 Feb 13, 2026
2087e06
remove useless null protected judgement
jiafu1115 Feb 13, 2026
e550341
Merge branch 'trunk' into storage
jiafu1115 Feb 23, 2026
7574235
correct the judgement for upload
jiafu1115 Feb 23, 2026
f2ffe86
correct the format
jiafu1115 Feb 23, 2026
95d096b
correct the format
jiafu1115 Feb 23, 2026
b5aa4a5
correct the format
jiafu1115 Feb 23, 2026
0c4c7c3
code refactor
jiafu1115 Feb 23, 2026
f9afae2
code refactor
jiafu1115 Feb 23, 2026
1c251f0
correct the code
jiafu1115 Feb 23, 2026
7148ac5
improve the document
jiafu1115 Feb 23, 2026
47a14ab
Merge branch 'apache:trunk' into storage
jiafu1115 Mar 25, 2026
c89f5ed
Merge branch 'apache:trunk' into storage
jiafu1115 Mar 28, 2026
cf52590
refactor the code using new definition for delay configure
jiafu1115 Mar 31, 2026
7bbed2e
Correct multiple log-related imports to RemoteLogManager
jiafu1115 Mar 31, 2026
7a4ef68
Update copy lag condition checks in RemoteLogManager
jiafu1115 Mar 31, 2026
fa34452
Refactor copy lag checks in RemoteLogManager
jiafu1115 Mar 31, 2026
8f2c427
Fix the check style issue
jiafu1115 Mar 31, 2026
ab81e29
correct the configure item's default value for delay upload
jiafu1115 Apr 1, 2026
2b5fede
refactor the method position
jiafu1115 Apr 1, 2026
4050a5b
Refine documentation for retention configuration options
jiafu1115 Apr 1, 2026
a1adb06
Update RemoteLogManagerConfig.java
jiafu1115 Apr 1, 2026
f2a1949
add more validation
jiafu1115 Apr 1, 2026
4dfa05b
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 Apr 1, 2026
4dbd1de
Fix return value in copy lag condition
jiafu1115 Apr 1, 2026
5573462
Fix documentation typos in RemoteLogManagerConfig
jiafu1115 Apr 1, 2026
a5dbf91
Fix documentation for remote copy lag configuration
jiafu1115 Apr 1, 2026
3ac973d
Fix documentation for LOG_REMOTE_COPY_LAG_MS_PROP
jiafu1115 Apr 1, 2026
075ea73
add more description
jiafu1115 Apr 1, 2026
6aa8c91
improve the documents
jiafu1115 Apr 1, 2026
7fe96a4
fix code issue which cause unit test failed
jiafu1115 Apr 2, 2026
571280a
Merge branch 'apache:trunk' into storage
jiafu1115 Apr 2, 2026
bfc7530
remove example line
jiafu1115 Apr 2, 2026
a06de2b
add more debug log
jiafu1115 Apr 3, 2026
9115afd
refactor the basic log
jiafu1115 Apr 3, 2026
a23d2ea
Refactor copy lag checks in RemoteLogManager
jiafu1115 Apr 3, 2026
3d966a1
Add log configure unit test
jiafu1115 Apr 3, 2026
fba4cea
Add log configure unit test
jiafu1115 Apr 3, 2026
0a7038b
Add log configure unit test
jiafu1115 Apr 3, 2026
f377844
Fix one bug and add more tests
jiafu1115 Apr 3, 2026
02819b2
Using real constants instead of magic number
jiafu1115 Apr 4, 2026
3d7c5c9
Add more unit tests
jiafu1115 Apr 4, 2026
79d9587
improve the unit test
jiafu1115 Apr 4, 2026
d85551e
Fix typos in TopicConfig documentation
jiafu1115 May 5, 2026
bc5b395
Fix typos in RemoteLogManagerConfig documentation
jiafu1115 May 5, 2026
eec8220
Merge branch 'trunk' into storage
jiafu1115 May 5, 2026
1122220
judge logger.isDebugEnabled() before print debug
jiafu1115 May 5, 2026
2fa65e7
change debug to trace
jiafu1115 May 5, 2026
e1a9c99
improve the code for config according to code review
jiafu1115 May 5, 2026
6d481c5
add more validation configure
jiafu1115 May 7, 2026
b302dac
add document for Constraint
jiafu1115 May 7, 2026
c32d37b
Merge branch 'apache:trunk' into storage
jiafu1115 May 7, 2026
31c686a
update document
jiafu1115 May 7, 2026
8036c81
remove the logic for -1 and -2
jiafu1115 May 7, 2026
abd0563
change to 0
jiafu1115 May 7, 2026
33944d4
remove useless test
jiafu1115 May 7, 2026
ec8ff4f
correct the unit test
jiafu1115 May 7, 2026
06a93df
correct the unit test
jiafu1115 May 7, 2026
a20e4d2
correct the unit test
jiafu1115 May 7, 2026
3cd8b26
correct the unit test
jiafu1115 May 7, 2026
8a0a031
correct the implement
jiafu1115 May 7, 2026
f26f152
remove the logic for -1 and -2
jiafu1115 May 7, 2026
5a68341
Merge remote-tracking branch 'origin/storageV2' into storageV2
jiafu1115 May 7, 2026
3cb22a1
Merge pull request #3 from jiafu1115/storageV2
jiafu1115 May 7, 2026
d8b0c67
Fix condition order for copy lag checks
jiafu1115 May 7, 2026
ed9fc99
fix compile issue
jiafu1115 May 7, 2026
078db33
Fix the unit test
jiafu1115 May 7, 2026
2fa0c23
Add more test
jiafu1115 May 7, 2026
1cf1c3b
add more document for the code
jiafu1115 May 7, 2026
9b5e899
improve the document
jiafu1115 May 7, 2026
f25f002
improve the document
jiafu1115 May 7, 2026
6b626f6
improve the unit test
jiafu1115 May 7, 2026
88dbcc7
rollback part of the code of -1
jiafu1115 May 8, 2026
cf7e873
rollback part of the code of -1
May 8, 2026
d494963
improve the code according the code review
jiafu1115 May 17, 2026
4311f2c
Merge branch 'apache:trunk' into storage
jiafu1115 May 17, 2026
1855a55
refactor the code
jiafu1115 May 17, 2026
eef02ef
Merge remote-tracking branch 'origin/storage' into storage
jiafu1115 May 17, 2026
41a47dd
refactor the code
jiafu1115 May 17, 2026
69c9c9b
remove the useless unit test
jiafu1115 May 18, 2026
5880721
refactor the unit test: split to dedicated test class
jiafu1115 May 18, 2026
8f23a7c
remove the unrelated code change.
jiafu1115 May 18, 2026
6d218ff
simpler the code
May 18, 2026
d478387
refactor the code
jiafu1115 May 18, 2026
134337d
add one unit test for both lag and retention is -1
jiafu1115 May 18, 2026
499cb59
reduce the code complex
jiafu1115 May 18, 2026
cbf725c
continue to reduce the code complex
jiafu1115 May 18, 2026
40d91a7
continue to reduce the code complex
jiafu1115 May 18, 2026
20ca059
continue to reduce the code complex
jiafu1115 May 18, 2026
8f4cdd7
correct the unit test
jiafu1115 May 19, 2026
3f0764f
remove the document for -1
jiafu1115 May 19, 2026
a8ad430
simple the code according to the code review suggestion
jiafu1115 May 21, 2026
367390a
handle the special case for segment age.
jiafu1115 May 21, 2026
dbaa4d6
add more unit test
jiafu1115 May 21, 2026
9cfbc5e
Merge branch 'apache:trunk' into storage
jiafu1115 May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ public class TopicConfig {
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";

public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
"The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention ms as maximum delay. " +
"For how the real local retention time is computed, see <code>local.retention.ms</code>.";

public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
"The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention bytes as maximum delay. " +
"For how the real local retention size is computed, see <code>local.retention.bytes</code>.";

public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,28 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE
}
}

def validateLogRemoteCopyLagMs(): Unit = {
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
}
}

def validateLogRemoteCopyLagBytes(): Unit = {
val logRetentionBytes: Long = newConfig.logRetentionBytes
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
}
}

def validateCordonedLogDirs(): Unit = {
val logDirs = newConfig.logDirs()
val cordonedLogDirs = newConfig.cordonedLogDirs()
Expand All @@ -592,6 +614,8 @@ class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryE

validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
validateLogRemoteCopyLagMs()
validateLogRemoteCopyLagBytes()
validateCordonedLogDirs()
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
logProps
}
}
61 changes: 61 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertPropertyInvalid(name, "not_a_number", "-0.1")
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG => assertPropertyInvalid(name, "not_a_number", "-2")
case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG => assertPropertyInvalid(name, "not_a_boolean")
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op

Expand Down Expand Up @@ -258,6 +260,65 @@ class LogConfigTest {
doTestInvalidLocalLogRetentionProps(2000L, -1, 100, 1000L)
}

@Test
def testInvalidRemoteCopyLagMsWhenGreaterThanEffectiveLocalRetentionMs(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "1001")

val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG))
}

@Test
def testInvalidRemoteCopyLagBytesWhenGreaterThanEffectiveLocalRetentionBytes(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "1001")

val exception = assertThrows(classOf[ConfigException], () => validateTopicLogConfig(props))
assertTrue(exception.getMessage.contains(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG))
}

@Test
def testValidRemoteCopyLagWhenBothLagChecksAreDisabled(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "-2")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "-2")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "0")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "0")

validateTopicLogConfig(props)
}

@Test
def testValidRemoteCopyLagMinusOneResolvesToLocalRetention(): Unit = {
val props = new util.HashMap[String, String]()
props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
props.put(TopicConfig.RETENTION_MS_CONFIG, "1000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "900")
props.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, "-1")
props.put(TopicConfig.RETENTION_BYTES_CONFIG, "2000")
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1800")
props.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, "-1")

validateTopicLogConfig(props)
}

private def validateTopicLogConfig(props: util.Map[String, String]): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
LogConfig.validate(util.Map.of, props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}

private def doTestInvalidLocalLogRetentionProps(localRetentionMs: Long,
localRetentionBytes: Int,
retentionBytes: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,55 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testDynamicRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add one test for valid dynamic broker config change? Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, move the testDynamicRemoteCopyLagThrowsOnIncorrectConfig test to DynamicBrokerConfigTest.java instead of DynamicBrokerConfigTest.scala.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be taken up later, we already have one dynamic broker config change test in KafkaConfigTest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. thanks @kamalcph
Thank you very much for your patient and thorough review.

// remote copy lag ms cannot exceed effective local retention ms
verifyIncorrectRemoteCopyLagProps(
retentionMs = 1000L,
logLocalRetentionMs = -2L,
remoteCopyLagMs = 1001L,
retentionBytes = 1000L,
logLocalRetentionBytes = -2L,
remoteCopyLagBytes = 100L
)

// remote copy lag bytes cannot exceed effective local retention bytes
verifyIncorrectRemoteCopyLagProps(
retentionMs = 1000L,
logLocalRetentionMs = -2L,
remoteCopyLagMs = 100L,
retentionBytes = 1000L,
logLocalRetentionBytes = -2L,
remoteCopyLagBytes = 1001L
)

}

def verifyIncorrectRemoteCopyLagProps(retentionMs: Long,
logLocalRetentionMs: Long,
remoteCopyLagMs: Long,
Comment thread
jiafu1115 marked this conversation as resolved.
retentionBytes: Long,
logLocalRetentionBytes: Long,
remoteCopyLagBytes: Long): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, retentionMs.toString)
props.put(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, retentionBytes.toString)
val config = KafkaConfig(props)
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)

val newProps = new Properties()
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs.toString)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, remoteCopyLagMs.toString)
newProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes.toString)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, remoteCopyLagBytes.toString)
// validate default config
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
// validate per broker config
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
}

@Test
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")
case RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "-2")

/** New group coordinator configs */
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Expand Down Expand Up @@ -1200,6 +1202,10 @@ class KafkaConfigTest {
assertDynamic(kafkaConfigProp, 10015L, () => config.remoteLogManagerConfig.logLocalRetentionMs)
case TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10016L, () => config.remoteLogManagerConfig.logLocalRetentionBytes)
case TopicConfig.REMOTE_COPY_LAG_MS_CONFIG =>
assertDynamic(kafkaConfigProp, 10017L, () => config.remoteLogManagerConfig.logRemoteCopyLagMs)
case TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG =>
assertDynamic(kafkaConfigProp, 10018L, () => config.remoteLogManagerConfig.logRemoteCopyLagBytes)
// not dynamically updatable
case QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG =>
// topic only config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public final class ServerTopicConfigSynonyms {
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)
sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG),
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG),
sameNameWithLogPrefix(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG)
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kafka.storage.internals.log.AsyncOffsetReader;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetIndex;
Expand Down Expand Up @@ -916,6 +917,7 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti
* 1) Segment is not the active segment and
* 2) Segment end-offset is less than the last-stable-offset as remote storage should contain only
* committed/acked messages
* 3) Segment has exceeded copy lag by time or size when configured (remote.copy.lag.ms, remote.copy.lag.bytes)
* @param log The log from which the segments are to be copied
* @param fromOffset The offset from which the segments are to be copied
* @param lastStableOffset The last stable offset of the log
Expand All @@ -925,18 +927,73 @@ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, L
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
List<LogSegment> segments = log.logSegments(fromOffset, Long.MAX_VALUE);
if (!segments.isEmpty()) {
long currentTimeMs = time.milliseconds();
long totalLogSize = UnifiedLog.sizeInBytes(segments);
long cumulativeSize = 0;
for (int idx = 1; idx < segments.size(); idx++) {
LogSegment previousSeg = segments.get(idx - 1);
LogSegment currentSeg = segments.get(idx);
if (currentSeg.baseOffset() <= lastStableOffset) {
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
cumulativeSize += previousSeg.size();
if (isEligibleForUpload(log.config(), previousSeg, currentTimeMs, totalLogSize, cumulativeSize)) {
candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
} else {
break;
}
}
}
// Discard the last active segment
}
return candidateLogSegments;
}

private boolean isEligibleForUpload(LogConfig logConfig, LogSegment previousSeg, long currentTimeMs, long totalLogSize, long cumulativeSize) {
long copyLagMs = logConfig.remoteCopyLagMs();
long copyLagBytes = logConfig.remoteCopyLagBytes();
if (logger.isTraceEnabled()) {
logger.trace("delayCopy check for segment {}: copyLagMs={}, copyLagBytes={}, currentTimeMs={}, totalLogSize={}, cumulativeSize={}, sizeLagBytes={}",
previousSeg, copyLagMs, copyLagBytes, currentTimeMs, totalLogSize, cumulativeSize, totalLogSize - cumulativeSize);
}

if (copyLagMs == 0 || copyLagBytes == 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log a warning or info message when a user configures one lag property but leaves the other at its default value of 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Thanks for your view.
How about describing this case in the documentation? If we only log it as a warning, users may ignore it. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can expand upgrade.md as well.

return true;
}

boolean limitedCopyLagMsCheck = copyLagMs > 0;
boolean limitedCopyLagSizeCheck = copyLagBytes > 0;

if (limitedCopyLagMsCheck && eligibleUploadByTime(previousSeg, currentTimeMs, copyLagMs)) {
return true;
}

return limitedCopyLagSizeCheck && eligibleUploadBySize(previousSeg, totalLogSize, cumulativeSize, copyLagBytes);
}

private boolean eligibleUploadByTime(LogSegment segment, long currentTimeMs, long copyLagMs) {
try {
long segmentAgeMs = currentTimeMs - segment.largestTimestamp();
boolean eligibleUpload = segmentAgeMs < 0 || segmentAgeMs >= copyLagMs;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lingering issue. The local segment with a future timestamp is still NOT deleted, right? Should we allow the deletion of these local files once they have been successfully uploaded to remote storage, even if they contain future timestamps?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the deletion of these local files once they have been successfully uploaded to remote storage, even if they contain future timestamps?

Currently, it gets deleted based on the local retention bytes limit since the local retention deletion logic follow the similar methods of full deletion logic.

https://sourcegraph.com/r/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java?L2008

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior looks like a bug to me, as local segments shouldn't be blocked from deletion once they are already uploaded. Perhaps we could fall back to checking the lastModifiedTime when a segment contains future records. Alternatively, we could introduce a new configuration flag to provide alternative logic for handling future records.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't be blocked from deletion once they are already uploaded. Perhaps we could fall back to checking the lastModifiedTime when a segment contains future records.

Yeah, we can allow this behavior by introducing a config. The segment exist in the remote storage but the user might face slowness in reading the data from remote if they don't have prefetching feature implemented in the Remote Storage Manager. So, better to gate the change in behavior via config and the change applies only when remote storage is enabled on the topic.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we all hate -2, there is another approach: reverse the configuration logic.

For example, we could introduce remote.copy.before.deletion.ms instead of remote.copy.lag.ms. If a user sets remote.copy.before.deletion.ms=2hr, it means they want to upload log segments 2 hours before they are scheduled to be deleted from local storage.

The default value would be -1, which dynamically resolves to local.retention.ms. This fully maintains backward compatibility (resulting in immediate upload). Setting it to 0 means the user wants to delay the upload as much as possible, triggering it right before local deletion.

The only side effect is that this kind of "reverse" or "countdown" time calculation is quite rare in the existing Kafka codebase.

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only side effect is that this kind of "reverse"

yeah, this is not intuitive when compared with the other configs like retention time/bytes. My suggestion is to:

  1. Improve the config documentation about configuring both the values and
  2. Add validation to throw an error when remote-copy-lag-time = 0 and remote-copy-lag-bytes != 0 and vice-versa.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that I replied to the wrong thread earlier, sorry about that 😢

How about this solution? if we found it contain future record. we use LogSegment#lastModified to compare the time?

Yes, that is an acceptable approach. I've opened https://issues.apache.org/jira/browse/KAFKA-20609 so we can keep discussing there.

yeah, this is not intuitive when compared with the other configs like retention time/bytes. My suggestion is to:

While documentation is the final line of defense for users, it's better to have an intuitive design out of the box. After all, me proposed reverse configuration was rejected precisely because it wasn't intuitive enough

Another way is to align the logic with retention.ms and retention.bytes. Since most users care more about time than size, we could set the default value of the time lag to 0, and the size lag to -1. This way, most users can just adjust the time setting without having to touch the size configuration. WDYT?

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Thanks for filing KAFKA-20609 ticket. Fixing this is out of scope of the KIP-1241 as the bug exist before too.

Since most users care more about time than size, we could set the default value of the time lag to 0, and the size lag to -1.

I like the idea to provide out-of-box default values that works in majority of use-cases and retain the eager upload logic. The new default values for remote-copy lag align with the log.retention.hours = 7 days and log.retention.bytes = infinite (-1). It lgtm.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph @chia7712 ack
Let me take time to think and handle it. Thanks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #22394 to address this comment. PTAL.

if (logger.isTraceEnabled()) {
logger.trace("{} eligible for upload by time? {} (segment age {} ms, copy lag {} ms)",
segment, eligibleUpload, segmentAgeMs, copyLagMs);
}
return eligibleUpload;
} catch (IOException e) {
logger.warn("Failed to get largest timestamp for segment {}, take it as eligible for upload based on time", segment, e);
return true;
}
}

private boolean eligibleUploadBySize(LogSegment segment, long totalLogSize, long cumulativeSize, long copyLagBytes) {
long sizeLagBytes = totalLogSize - cumulativeSize;
boolean eligibleUpload = sizeLagBytes >= copyLagBytes;
if (logger.isTraceEnabled()) {
logger.trace("{} eligible for upload by size? {} (size lag {} bytes, copy lag {} bytes, totalLogSize={}, cumulativeSize={})",
segment, eligibleUpload, sizeLagBytes, copyLagBytes, totalLogSize, cumulativeSize);
}
return eligibleUpload;
}

public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException, RetriableRemoteStorageException {
if (isCancelled())
return;
Expand Down
Loading
Loading