-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19080 The constraint on segment.ms is not enforced at topic level #19371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
ef72eee
36e59c8
7a29f51
1d66fb0
60c01e9
9398bae
0f766c5
a0342ed
f707465
768c8ed
640ca6b
2bfd478
51da598
8c39276
eefa646
e2adfbe
3d8e122
303ef39
3f1b757
71dbb89
4c62f11
c847bbc
ad36e75
5e4fe46
9b9f469
4e7a080
8b7d366
5bbdf1f
2a3db85
919365a
a332072
b774d92
6876739
34a794f
c10c9cb
ea981e3
934ac37
77898ed
1756fee
0bfec85
1a3f737
5761c62
06131bf
7483a5d
048e052
76bd287
ce644ae
a4d3f56
95d8e28
b91e2c6
0f35480
f30c259
197561c
9d5a0bd
2597bac
c70f991
6e61299
c69973d
a4af39c
569b599
6417e0e
2ababd6
09b7b78
0f81aac
cf007c8
e0fa8d0
31ff8d0
c01e949
ab60639
e9db340
e715e52
898a42a
21ef99b
2974c03
f5aa91a
7557113
d46fe99
6c434d2
8316923
85d77d3
872c08f
9a1afb0
413bea4
ee9711b
cece060
d8426bc
0593acc
b622e08
eb45095
68657e6
6ec2009
d01cfb6
2c232c6
2c569cd
e85196a
95ee953
0b4ef66
2bc3998
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -575,6 +575,7 @@ final class KafkaMetadataLog private ( | |
} | ||
|
||
object KafkaMetadataLog extends Logging { | ||
|
||
def apply( | ||
topicPartition: TopicPartition, | ||
topicId: Uuid, | ||
|
@@ -583,35 +584,72 @@ object KafkaMetadataLog extends Logging { | |
scheduler: Scheduler, | ||
config: MetadataLogConfig | ||
): KafkaMetadataLog = { | ||
val props = new Properties() | ||
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString) | ||
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) | ||
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString) | ||
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT.toString) | ||
|
||
// Disable time and byte retention when deleting segments | ||
props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1") | ||
props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1") | ||
val props: Properties = settingLogProperties(config) | ||
LogConfig.validate(props) | ||
val defaultLogConfig = new LogConfig(props) | ||
|
||
if (config.logSegmentBytes < config.logSegmentMinBytes) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, this logic has been replaced by the config validation system. SEGMENT_BYTES_CONFIG has minimum of 1 MB but can be overridden by INTERNAL_SEGMENT_BYTES_CONFIG which has no minimum. |
||
throw new InvalidConfigurationException( | ||
s"Cannot set ${KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}" | ||
) | ||
} else if (defaultLogConfig.retentionMs >= 0) { | ||
throw new InvalidConfigurationException( | ||
s"Cannot set ${TopicConfig.RETENTION_MS_CONFIG} above -1: ${defaultLogConfig.retentionMs}." | ||
) | ||
} else if (defaultLogConfig.retentionSize >= 0) { | ||
throw new InvalidConfigurationException( | ||
s"Cannot set ${TopicConfig.RETENTION_BYTES_CONFIG} above -1: ${defaultLogConfig.retentionSize}." | ||
) | ||
validateConfig(config, defaultLogConfig) | ||
|
||
val metadataLog: KafkaMetadataLog = createKafkaMetadataLog(topicPartition, topicId, dataDir, time, scheduler, config, defaultLogConfig) | ||
|
||
printWarningMessage(config, metadataLog) | ||
|
||
// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower | ||
// when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. | ||
metadataLog.truncateToLatestSnapshot() | ||
|
||
metadataLog | ||
} | ||
|
||
private def printWarningMessage(config: MetadataLogConfig, metadataLog: KafkaMetadataLog): Unit = { | ||
// Print a warning if users have overridden the internal config | ||
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { | ||
metadataLog.error(s"Overriding ${KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " + | ||
s"this value too low may lead to an inability to write batches of metadata records.") | ||
} | ||
} | ||
|
||
// visible for testing | ||
def internalApply( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of having an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to move the internal config to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just realized that metadata log uses a different approach to allow tests to use a smaller segment bytes than allowed in production. That approach defines the original segment byte config with a small minimal requirement, but adds METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG to enforce the actual minimal requirement in production. This new config could be changed in tests to allow for smaller minimal bytes. The benefit of this approach is that it allows the existing config to be used directly to set a smaller value for tests. The downside is that the doc for min value is inaccurate and the validation is done through a customized logic. It would be useful to pick the same strategy between metadata log and regular log. The metadata log approach seems slightly better since it's less intrusive. We could fix the inaccurate min value description for production somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excuse me, the strategy used by metadata log is to add a "internal" config ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am just saying that we now have two different ways to achieve the same goal. In the metadata log approach, you set the desired value through the original config, which is segment.bytes. You then set an internal config to change the min constraint. The approach in this PR is to set the desired value through a different internal config. It would be useful to choose same approach for both the metadata log and the regular log. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally, I prefer the approach of adding "internal".xxx config as it provide better user experience for public configs, allowing users to see the "correct" min value. Additionally, we can remove the customized logic of validation. In short, I suggest to add following changes to this PR.
public int logSegmentBytes() {
if (internalSogSegmentBytes != null) return internalSogSegmentBytes;
return logSegmentBytes;
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, why do we need |
||
topicPartition: TopicPartition, | ||
topicId: Uuid, | ||
dataDir: File, | ||
time: Time, | ||
scheduler: Scheduler, | ||
config: MetadataLogConfig | ||
): KafkaMetadataLog = { | ||
val props: Properties = settingLogProperties(config) | ||
props.remove(TopicConfig.SEGMENT_BYTES_CONFIG); | ||
props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) | ||
LogConfig.validate(props) | ||
val defaultLogConfig = new LogConfig(props) | ||
|
||
validateConfig(config, defaultLogConfig) | ||
|
||
val metadataLog: KafkaMetadataLog = createKafkaMetadataLog(topicPartition, topicId, dataDir, time, scheduler, config, defaultLogConfig) | ||
|
||
// Print a warning if users have overridden the internal config | ||
printWarningMessage(config, metadataLog) | ||
|
||
// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower | ||
// when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. | ||
metadataLog.truncateToLatestSnapshot() | ||
|
||
metadataLog | ||
} | ||
|
||
private def createKafkaMetadataLog( | ||
topicPartition: TopicPartition, | ||
topicId: Uuid, | ||
dataDir: File, | ||
time: Time, | ||
scheduler: Scheduler, | ||
metadataLogConfig: MetadataLogConfig, | ||
logConfig: LogConfig | ||
): KafkaMetadataLog = { | ||
val log = UnifiedLog.create( | ||
dataDir, | ||
defaultLogConfig, | ||
logConfig, | ||
0L, | ||
0L, | ||
scheduler, | ||
|
@@ -631,20 +669,38 @@ object KafkaMetadataLog extends Logging { | |
scheduler, | ||
recoverSnapshots(log), | ||
topicPartition, | ||
config | ||
metadataLogConfig | ||
) | ||
metadataLog | ||
} | ||
|
||
// Print a warning if users have overridden the internal config | ||
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we keep the error message if INTERNAL_SEGMENT_BYTES_CONFIG has been set? This was added to help operators avoid setting these testing-only configs in a production environment. |
||
metadataLog.error(s"Overriding ${KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " + | ||
s"this value too low may lead to an inability to write batches of metadata records.") | ||
private def validateConfig(config: MetadataLogConfig, defaultLogConfig: LogConfig): Unit = { | ||
if (config.logSegmentBytes < config.logSegmentMinBytes) { | ||
throw new InvalidConfigurationException( | ||
s"Cannot set ${KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}" | ||
) | ||
} else if (defaultLogConfig.retentionMs >= 0) { | ||
throw new InvalidConfigurationException( | ||
s"Cannot set ${TopicConfig.RETENTION_MS_CONFIG} above -1: ${defaultLogConfig.retentionMs}." | ||
) | ||
} else if (defaultLogConfig.retentionSize >= 0) { | ||
throw new InvalidConfigurationException( | ||
s"Cannot set ${TopicConfig.RETENTION_BYTES_CONFIG} above -1: ${defaultLogConfig.retentionSize}." | ||
) | ||
} | ||
} | ||
|
||
// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower | ||
// when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully. | ||
metadataLog.truncateToLatestSnapshot() | ||
private def settingLogProperties(config: MetadataLogConfig) = { | ||
val props = new Properties() | ||
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString) | ||
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString) | ||
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString) | ||
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT.toString) | ||
|
||
metadataLog | ||
// Disable time and byte retention when deleting segments | ||
props.setProperty(TopicConfig.RETENTION_MS_CONFIG, "-1") | ||
props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1") | ||
props | ||
} | ||
|
||
private def recoverSnapshots( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PR | |
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ServerLogConfigs} | ||
import org.apache.kafka.metadata.authorizer.StandardAuthorizer | ||
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer} | ||
import org.apache.kafka.storage.internals.log.LogConfig | ||
import org.apache.kafka.test.TestUtils.assertFutureThrows | ||
import org.junit.jupiter.api.Assertions._ | ||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} | ||
|
@@ -566,7 +567,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | |
client.createAcls(List(denyAcl).asJava, new CreateAclsOptions()).all().get() | ||
|
||
val topics = Seq(topic1, topic2) | ||
val configsOverride = Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "100000").asJava | ||
val configsOverride = Map(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG -> "100000").asJava | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this test case is used to verify the custom topic-level config, so we can increase the value to make test pass. |
||
val newTopics = Seq( | ||
new NewTopic(topic1, 2, 3.toShort).configs(configsOverride), | ||
new NewTopic(topic2, Option.empty[Integer].toJava, Option.empty[java.lang.Short].toJava).configs(configsOverride)) | ||
|
@@ -579,7 +580,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu | |
assertEquals(3, result.replicationFactor(topic1).get()) | ||
val topicConfigs = result.config(topic1).get().entries.asScala | ||
assertTrue(topicConfigs.nonEmpty) | ||
val segmentBytesConfig = topicConfigs.find(_.name == TopicConfig.SEGMENT_BYTES_CONFIG).get | ||
val segmentBytesConfig = topicConfigs.find(_.name == LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG).get | ||
assertEquals(100000, segmentBytesConfig.value.toLong) | ||
assertEquals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, segmentBytesConfig.source) | ||
val compressionConfig = topicConfigs.find(_.name == TopicConfig.COMPRESSION_TYPE_CONFIG).get | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a critical bug - open https://issues.apache.org/jira/browse/KAFKA-19392 to fix it