forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathKafkaRaftServer.scala
More file actions
193 lines (175 loc) · 7.98 KB
/
KafkaRaftServer.scala
File metadata and controls
193 lines (175 loc) · 7.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.utils.{CoreUtils, Logging, Mx4jLoader}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
import org.slf4j.Logger
import java.util
import java.util.{Optional, OptionalInt}
import scala.jdk.CollectionConverters._
/**
* This class implements the KRaft (Kafka Raft) mode server which relies
* on a KRaft quorum for maintaining cluster metadata. It is responsible for
* constructing the controller and/or broker based on the `process.roles`
* configuration and for managing their basic lifecycle (startup and shutdown).
*
*/
class KafkaRaftServer(
config: KafkaConfig,
time: Time,
) extends Server with Logging {
this.logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] "
private val (metaPropsEnsemble, bootstrapMetadata) =
KafkaRaftServer.initializeLogDirs(config, this.logger.underlying, this.logIdent)
private val metrics = Server.initializeMetrics(
config,
time,
metaPropsEnsemble.clusterId().get()
)
private val sharedServer = new SharedServer(
config,
metaPropsEnsemble,
time,
metrics,
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
new StandardFaultHandlerFactory(),
ServerSocketFactory.INSTANCE,
)
private val broker: Option[BrokerServer] = if (config.processRoles.contains(ProcessRole.BrokerRole)) {
Some(new BrokerServer(sharedServer))
} else {
None
}
private val controller: Option[ControllerServer] = if (config.processRoles.contains(ProcessRole.ControllerRole)) {
Some(new ControllerServer(
sharedServer,
KafkaRaftServer.configSchema,
bootstrapMetadata,
))
} else {
None
}
override def startup(): Unit = {
Mx4jLoader.maybeLoad()
// Controller component must be started before the broker component so that
// the controller endpoints are passed to the KRaft manager
controller.foreach(_.startup())
broker.foreach(_.startup())
AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
info(KafkaBroker.STARTED_MESSAGE)
}
override def shutdown(): Unit = {
// In combined mode, we want to shut down the broker first, since the controller may be
// needed for controlled shutdown. Additionally, the controller shutdown process currently
// stops the raft client early on, which would disrupt broker shutdown.
broker.foreach(_.shutdown())
controller.foreach(_.shutdown())
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics), this)
}
override def awaitShutdown(): Unit = {
broker.foreach(_.awaitShutdown())
controller.foreach(_.awaitShutdown())
}
}
object KafkaRaftServer {
val MetadataTopic = Topic.CLUSTER_METADATA_TOPIC_NAME
val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
val MetadataTopicId = Uuid.METADATA_TOPIC_ID
/**
* Initialize the configured log directories, including both [[KRaftConfigs.MetadataLogDirProp]]
* and [[KafkaConfig.LOG_DIR_PROP]]. This method performs basic validation to ensure that all
* directories are accessible and have been initialized with consistent `meta.properties`.
*
* @param config The process configuration
* @return A tuple containing the loaded meta properties (which are guaranteed to
* be consistent across all log dirs) and the offline directories
*/
def initializeLogDirs(
config: KafkaConfig,
log: Logger,
logPrefix: String
): (MetaPropertiesEnsemble, BootstrapMetadata) = {
// Load and verify the original ensemble.
val loader = new MetaPropertiesEnsemble.Loader()
loader.addMetadataLogDir(config.metadataLogDir)
.addLogDirs(config.logDirs)
val initialMetaPropsEnsemble = loader.load()
val verificationFlags = util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR)
initialMetaPropsEnsemble.verify(Optional.empty(), OptionalInt.of(config.nodeId), verificationFlags)
// Check that the __cluster_metadata-0 topic does not appear outside the metadata directory.
val metadataPartitionDirName = UnifiedLog.logDirName(MetadataPartition)
initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir => {
if (!logDir.equals(config.metadataLogDir)) {
val clusterMetadataTopic = new File(logDir, metadataPartitionDirName)
if (clusterMetadataTopic.exists) {
throw new KafkaException(s"Found unexpected metadata location in data directory `$clusterMetadataTopic` " +
s"(the configured metadata directory is ${config.metadataLogDir}).")
}
}
})
// Set directory IDs on all directories. Rewrite the files if needed.
val metaPropsEnsemble = {
val copier = new MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
val logDir = e.getKey
val metaProps = e.getValue
if (!metaProps.isPresent()) {
throw new RuntimeException(s"No `meta.properties` found in $logDir (have you run `kafka-storage.sh` " +
"to format the directory?)")
}
if (!metaProps.get().nodeId().isPresent()) {
throw new RuntimeException(s"Error: node ID not found in $logDir")
}
if (!metaProps.get().clusterId().isPresent()) {
throw new RuntimeException(s"Error: cluster ID not found in $logDir")
}
val builder = new MetaProperties.Builder(metaProps.get())
if (!builder.directoryId().isPresent()) {
builder.setDirectoryId(copier.generateValidDirectoryId())
}
copier.setLogDirProps(logDir, builder.build())
copier.setPreWriteHandler((logDir, _, _) => {
log.info("{}Rewriting {}{}meta.properties", logPrefix, logDir, File.separator)
})
})
copier.writeLogDirChanges()
copier.copy()
}
// Load the BootstrapMetadata.
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir)
val bootstrapMetadata = bootstrapDirectory.maybeReadLegacyBootstrapCheckpoint()
(metaPropsEnsemble, bootstrapMetadata)
}
val configSchema = new KafkaConfigSchema(Map(
ConfigResource.Type.BROKER -> new ConfigDef(KafkaConfig.configDef),
ConfigResource.Type.TOPIC -> LogConfig.configDefCopy,
).asJava, ServerTopicConfigSynonyms.ALL_TOPIC_CONFIG_SYNONYMS)
}