Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,22 @@ public class TopicConfig {
"(i.e. retention.ms/bytes).";

public static final String REMOTE_COPY_LAG_MS_CONFIG = "remote.copy.lag.ms";
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
"The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention ms as maximum delay. " +
"For how the real local retention time is computed, see <code>local.retention.ms</code>.";
public static final String REMOTE_COPY_LAG_MS_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
"A non-active segment is upload-eligible when either this time-based check or <code>remote.copy.lag.bytes</code> is satisfied. " +
"When set to 0, uploads are immediately eligible regardless of lag checks. " +
"When set to a positive value (ms), the segment is time-eligible once elapsed time since its latest record reaches this value. " +
"When set to -1, this value is derived from effective local retention time (<code>local.retention.ms</code>). " +
"If that effective local retention time is unlimited (-1), this time-based check is not applied. " +
"A positive value should not exceed effective local retention time unless local retention is unlimited (-1).";

public static final String REMOTE_COPY_LAG_BYTES_CONFIG = "remote.copy.lag.bytes";
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
"The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention bytes as maximum delay. " +
"For how the real local retention size is computed, see <code>local.retention.bytes</code>.";
public static final String REMOTE_COPY_LAG_BYTES_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
"A non-active segment is upload-eligible when either this size-based check or <code>remote.copy.lag.ms</code> is satisfied. " +
"When set to 0, uploads are immediately eligible regardless of lag checks. " +
"When set to a positive value (bytes), the segment is size-eligible once bytes of newer local log data after that segment reaches this value. " +
"When set to -1, this value is derived from effective local retention size (<code>local.retention.bytes</code>). " +
"If that effective local retention size is unlimited (-1), this size-based check is not applied. " +
"A positive value should not exceed effective local retention size unless local retention is unlimited (-1).";

public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,32 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testDynamicLogRemoteCopyLagConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, port = 8181)
val config = KafkaConfig(props)
val dynamicLogConfig = new DynamicLogConfig(mock(classOf[LogManager]), mock(classOf[DirectoryEventHandler]))
config.dynamicConfig.initialize(None)
config.dynamicConfig.addBrokerReconfigurable(dynamicLogConfig)
assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS, config.remoteLogManagerConfig.logRemoteCopyLagMs)
assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES, config.remoteLogManagerConfig.logRemoteCopyLagBytes)

// update default config
val newProps = new Properties()
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, "100")
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, "200")
config.dynamicConfig.validate(newProps, perBrokerConfig = false)
config.dynamicConfig.updateDefaultConfig(newProps)
assertEquals(100L, config.remoteLogManagerConfig.logRemoteCopyLagMs())
assertEquals(200L, config.remoteLogManagerConfig.logRemoteCopyLagBytes())

// update per broker config
config.dynamicConfig.validate(newProps, perBrokerConfig = true)
newProps.put(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, "300")
config.dynamicConfig.updateBrokerConfig(0, newProps)
assertEquals(300L, config.remoteLogManagerConfig.logRemoteCopyLagBytes())
}

@Test
def testDynamicLogRemoteCopyLagThrowsOnIncorrectConfig(): Unit = {
// log remote copy lag ms cannot exceed effective log local retention ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,26 @@ public final class RemoteLogManagerConfig {
public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;

public static final String LOG_REMOTE_COPY_LAG_MS_PROP = "log.remote.copy.lag.ms";
public static final String LOG_REMOTE_COPY_LAG_MS_DOC = "Controls how long to delay uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (ms), a segment can't become eligible for upload until the time since the latest record in the segment reaches the value. " +
"The value should not exceed the real local retention ms except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention ms as maximum delay. " +
"For how the real local retention time is computed, see <code>log.local.retention.ms</code>.";
public static final String LOG_REMOTE_COPY_LAG_MS_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
"A non-active segment is upload-eligible when either this time-based check or <code>log.remote.copy.lag.bytes</code> is satisfied. " +
"When set to 0, uploads are immediately eligible regardless of lag checks. " +
"When set to a positive value (ms), the segment is time-eligible once elapsed time since its latest record reaches this value. " +
"When set to -1, this value is derived from effective local retention time (<code>log.local.retention.ms</code>). " +
"If that effective local retention time is unlimited (-1), this time-based check is not applied. " +
"A positive value should not exceed effective local retention time unless local retention is unlimited (-1).";
public static final Long MAX_LOG_REMOTE_COPY_LAG_MS = -1L; // It indicates the value depends on log.local.retention.ms
public static final Long DEFAULT_LOG_REMOTE_COPY_LAG_MS = 0L;

public static final String LOG_REMOTE_COPY_LAG_BYTES_PROP = "log.remote.copy.lag.bytes";
public static final String LOG_REMOTE_COPY_LAG_BYTES_DOC = "Controls size-based delay for uploading segments to remote storage. " +
"When set to 0, immediate upload without any delay check. " +
"When set to a positive value (bytes), a segment can't become eligible for upload until the total bytes of log data after the segment reach the value. " +
"The value should not exceed the real local retention bytes except the latter is retained indefinitely (-1). " +
"When set to -1, resolves to the real local retention bytes as maximum delay. " +
"For how the real local retention size is computed, see <code>log.local.retention.bytes</code>.";
public static final Long DEFAULT_LOG_REMOTE_COPY_LAG_BYTES = 0L;
public static final String LOG_REMOTE_COPY_LAG_BYTES_DOC = "Controls one of the two upload eligibility checks (time and size) for copying segments to remote storage. " +
"A non-active segment is upload-eligible when either this size-based check or <code>log.remote.copy.lag.ms</code> is satisfied. " +
"When set to 0, uploads are immediately eligible regardless of lag checks. " +
"When set to a positive value (bytes), the segment is size-eligible once bytes of newer local log data after that segment reaches this value. " +
"When set to -1, this value is derived from effective local retention size (<code>log.local.retention.bytes</code>). " +
"If that effective local retention size is unlimited (-1), this size-based check is not applied. " +
"A positive value should not exceed effective local retention size unless local retention is unlimited (-1).";
public static final Long MAX_LOG_REMOTE_COPY_LAG_BYTES = -1L; // It indicates the value depends on log.local.retention.bytes
public static final Long DEFAULT_LOG_REMOTE_COPY_LAG_BYTES = MAX_LOG_REMOTE_COPY_LAG_BYTES;

public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = "remote.log.manager.copy.max.bytes.per.second";
public static final String REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes that can be copied from local storage to remote storage per second. " +
Expand Down Expand Up @@ -368,13 +372,13 @@ public static ConfigDef configDef() {
.define(LOG_REMOTE_COPY_LAG_MS_PROP,
LONG,
DEFAULT_LOG_REMOTE_COPY_LAG_MS,
atLeast(-1),
atLeast(MAX_LOG_REMOTE_COPY_LAG_MS),
MEDIUM,
LOG_REMOTE_COPY_LAG_MS_DOC)
.define(LOG_REMOTE_COPY_LAG_BYTES_PROP,
LONG,
DEFAULT_LOG_REMOTE_COPY_LAG_BYTES,
atLeast(-1),
atLeast(MAX_LOG_REMOTE_COPY_LAG_BYTES),
MEDIUM,
LOG_REMOTE_COPY_LAG_BYTES_DOC)
.define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.record.BrokerCompressionType;

import java.util.Collections;
Expand Down Expand Up @@ -144,10 +145,6 @@ public Optional<String> serverConfigName(String configName) {
public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false;
public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes
public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs
public static final long DEFAULT_REMOTE_COPY_LAG_MS = 0;
public static final long DEFAULT_REMOTE_COPY_LAG_BYTES = 0;
public static final long MAX_REMOTE_COPY_LAG_MS = -1; // It indicates the value depends on local retention ms
public static final long MAX_REMOTE_COPY_LAG_BYTES = -1; // It indicates the value depends on local retention bytes

public static final String INTERNAL_SEGMENT_BYTES_CONFIG = "internal.segment.bytes";
public static final String INTERNAL_SEGMENT_BYTES_DOC = "The maximum size of a single log file. This should be used for testing only.";
Expand Down Expand Up @@ -257,8 +254,8 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_MS, atLeast(-1), MEDIUM, TopicConfig.REMOTE_COPY_LAG_MS_DOC)
.define(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_BYTES, atLeast(-1), MEDIUM, TopicConfig.REMOTE_COPY_LAG_BYTES_DOC)
.define(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LONG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS, atLeast(RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_MS), MEDIUM, TopicConfig.REMOTE_COPY_LAG_MS_DOC)
.define(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LONG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES, atLeast(RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_BYTES), MEDIUM, TopicConfig.REMOTE_COPY_LAG_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
.define(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC)
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
Expand Down Expand Up @@ -418,13 +415,12 @@ public Boolean remoteLogCopyDisable() {
return remoteLogConfig.remoteLogCopyDisable;
}


public long remoteCopyLagMs() {
return remoteLogConfig.remoteCopyLagMs == MAX_REMOTE_COPY_LAG_MS ? localRetentionMs() : remoteLogConfig.remoteCopyLagMs;
return remoteLogConfig.remoteCopyLagMs == RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_MS ? localRetentionMs() : remoteLogConfig.remoteCopyLagMs;
}

public long remoteCopyLagBytes() {
return remoteLogConfig.remoteCopyLagBytes == MAX_REMOTE_COPY_LAG_BYTES ? localRetentionBytes() : remoteLogConfig.remoteCopyLagBytes;
return remoteLogConfig.remoteCopyLagBytes == RemoteLogManagerConfig.MAX_LOG_REMOTE_COPY_LAG_BYTES ? localRetentionBytes() : remoteLogConfig.remoteCopyLagBytes;
}

public long localRetentionMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,9 @@ public void testCandidateLogSegmentsUploadWhenBothRemoteCopyLagConfigsAreDefault

Map<String, Long> logProps = new HashMap<>();
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 10_000L);
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LogConfig.DEFAULT_REMOTE_COPY_LAG_MS);
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS);
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, 10_000L);
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LogConfig.DEFAULT_REMOTE_COPY_LAG_BYTES);
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES);
LogConfig logConfig = new LogConfig(logProps);
when(log.config()).thenReturn(logConfig);
when(log.logSegments(5L, Long.MAX_VALUE)).thenReturn(List.of(segment1, segment2, activeSegment));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

@SuppressWarnings("removal")
public class RemoteLogManagerConfigTest {

@Test
public void testValidConfigs() {
String rsmPrefix = "__custom.rsm.";
Expand All @@ -53,6 +54,8 @@ public void testDefaultConfigs() {
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_MS, remoteLogManagerConfigEmptyConfig.logRemoteCopyLagMs());
assertEquals(RemoteLogManagerConfig.DEFAULT_LOG_REMOTE_COPY_LAG_BYTES, remoteLogManagerConfigEmptyConfig.logRemoteCopyLagBytes());
}

@Test
Expand Down
Loading