Upgrade Kafka to 4.1.1#2342
Conversation
6a07fb1 to
1d2fd41
Compare
Change-Id: I689c3002a417f3e3bd319f714df3bf6a1994b8c0
| org.gradle.jvmargs=-Xms512m -Xmx512m | ||
| scalaVersion=2.13.13 | ||
| kafkaVersion=4.0.0 | ||
| kafkaVersion=4.1.1 |
There was a problem hiding this comment.
Can you please also remove the now un-needed gradle override in https://github.com/linkedin/cruise-control/pull/2286/files#diff-49a96e7eea8a94af862798a45174e6ac43eb4f8b4bd40759b5da63ba31ec3ef7R311?
See #2354
Change-Id: I605570ae67238e2a9270531628625045cdbbccdd
1d2fd41 to
9cf8e39
Compare
| readLogDirs(config); | ||
| _id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG)); | ||
| _metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG)); | ||
| _metadataLogDir = new File((String) config.get(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)); |
There was a problem hiding this comment.
In effort to avoid dependence on internal/private Kafka APIs like MetadataLogConfig [1] could we create a field like the following:
public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";
in the KafkaServerConfigs class [2] instead?
[1] #2282
[2] https://github.com/linkedin/cruise-control/blob/main/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/KafkaServerConfigs.java
There was a problem hiding this comment.
@kyguy, do you think there's some way we can clearly instruct people to avoid adding dependencies on the internal Kafka APIs? Because doing so would start nullifying your previous work to decouple CC from them.
Maybe a README heads up, maybe a contribution guide, maybe a PR template hint...
There was a problem hiding this comment.
Maybe a README heads up, maybe a contribution guide, maybe a PR template hint...
Those all sound like good ideas to me. It'll definitely be a lot easier after we have finished the migration off of internal Kafka APIs and have the related dependencies removed from the gradle file. Then we could have some sort of check in the PR template to make sure no internal Kafka dependencies have been added to the gradle file as part of the PR.
| props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString()); | ||
| props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath()); | ||
| props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath()); | ||
| props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath()); |
| private void initializeMetaData(KafkaConfig config) { | ||
| Set<File> allLogDirs = new HashSet<>(readLogDirs(config)); | ||
| allLogDirs.add(new File(config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG))); | ||
| allLogDirs.add(new File(config.getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG))); |
|
|
||
| @Test | ||
| public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException { | ||
| @Ignore("Container stop and start will destroy Kafka broker and create a new one so this test is not valid in this form.") |
There was a problem hiding this comment.
Even if we are recreating a broker here, isn't the test still valid? From what I understand the cruise control metric reporter agent of the recreated broker will update the metrics topic config won't it?
[1]
There was a problem hiding this comment.
I think TestContainers are not OK with it. I tested this multiple times and it seems like it is flaky and it can stuck in a bad state (The still available node tries to connect to the other node which is not existing, and the destroyed node cannot be created again due to Kafka errors).
It may need a deeper investigation, but I disabled this test now, since it was very flaky on my PRs.
There was a problem hiding this comment.
I took a closer look at this today and was able to reproduce the issue on my side. It appears that this test was never fully migrated for KRaft clusters. Currently, it sets up a cluster with only two brokers, so when the test restarts one of them, the quorum is temporarily lost. As a result, the check for the topic metadata change times out while waiting for the quorum to recover.
From what I understand, Kafka 4.x enforces stricter Raft semantics compared to Kafka 3.9.1, which is why this test used to pass consistently before the upgrade.
To address this issue we can create a cluster with 3 brokers so that the quorum isn't lost when one of the brokers is restarted. We can do this by adding the following to the class:
private static final int NUM_OF_BROKERS = 3;
@Override
protected int clusterSize() {
return NUM_OF_BROKERS;
}
With this change the test seems to pass consistently on my side, let me know what you think and/or if it works for you!
There was a problem hiding this comment.
Thanks for investigating it. I applied this fix in a new commit, since if it was more frequent with Kafka 4+, then it should be fixed in this PR.
| import org.apache.kafka.server.config.ServerTopicConfigSynonyms; | ||
| import org.apache.kafka.server.metrics.KafkaYammerMetrics; | ||
| import org.apache.kafka.storage.internals.log.LogConfig; | ||
| import org.apache.kafka.storage.internals.log.UnifiedLog; |
There was a problem hiding this comment.
Same comment for this as well, is there a way we can avoid using private Kafka APIs?
Change-Id: Ic258099bb3d55c44696aff7aa985fdb3cc43bec1
Change-Id: I3288df802040d66783d57467ea6b6e2cf4e8ee4e
|
@bgrishinko |
Summary
Bump Kafka version to 4.1.1.
Categorization
This PR resolves #2341 if any.