Skip to content

Commit 116762f

Browse files
KAFKA-16016: Add docker wrapper in core and remove docker utility script (#15048)
Migrates functionality provided by utility to Kafka core. This wrapper will be used to generate property files and format storage when invoked from docker container. Reviewers: Mickael Maison <[email protected]>, Viktor Somogyi-Vass <[email protected]>, Manikumar Reddy <[email protected]>
1 parent da2aa68 commit 116762f

17 files changed

+625
-900
lines changed

build.gradle

-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ if (repo != null) {
208208
'licenses/*',
209209
'**/generated/**',
210210
'clients/src/test/resources/serializedData/*',
211-
'docker/resources/utility/go.sum',
212211
'docker/test/fixtures/secrets/*',
213212
'docker/examples/fixtures/secrets/*'
214213
])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package kafka.docker
18+
19+
import kafka.tools.StorageTool
20+
import kafka.utils.Exit
21+
import net.sourceforge.argparse4j.ArgumentParsers
22+
import net.sourceforge.argparse4j.impl.Arguments.store
23+
import net.sourceforge.argparse4j.inf.Namespace
24+
25+
import java.nio.charset.StandardCharsets
26+
import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption}
27+
28+
object KafkaDockerWrapper {
29+
def main(args: Array[String]): Unit = {
30+
val namespace = parseArguments(args)
31+
val command = namespace.getString("command")
32+
command match {
33+
case "setup" =>
34+
val defaultConfigsPath = Paths.get(namespace.getString("default_configs_dir"))
35+
val mountedConfigsPath = Paths.get(namespace.getString("mounted_configs_dir"))
36+
val finalConfigsPath = Paths.get(namespace.getString("final_configs_dir"))
37+
try {
38+
prepareConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath)
39+
} catch {
40+
case e: Throwable =>
41+
val errMsg = s"error while preparing configs: ${e.getMessage}"
42+
System.err.println(errMsg)
43+
Exit.exit(1, Some(errMsg))
44+
}
45+
46+
val formatCmd = formatStorageCmd(finalConfigsPath, envVars)
47+
StorageTool.main(formatCmd)
48+
case _ =>
49+
throw new RuntimeException(s"Unknown operation $command. " +
50+
s"Please provide a valid operation: 'setup'.")
51+
}
52+
}
53+
54+
import Constants._
55+
56+
private def parseArguments(args: Array[String]): Namespace = {
57+
val parser = ArgumentParsers.
58+
newArgumentParser("kafka-docker-wrapper", true, "-", "@").
59+
description("The Kafka docker wrapper.")
60+
61+
val subparsers = parser.addSubparsers().dest("command")
62+
63+
val setupParser = subparsers.addParser("setup")
64+
65+
setupParser.addArgument("--default-configs-dir", "-D").
66+
action(store()).
67+
required(true).
68+
help(
69+
"""Directory which holds default properties. It should contain the three file:-
70+
|server.properties, log4j.properties and tools-log4j.properties.
71+
|""".stripMargin)
72+
73+
setupParser.addArgument("--mounted-configs-dir", "-M").
74+
action(store()).
75+
required(true).
76+
help(
77+
"""Directory which holds user mounted properties. It can contain none to all the three files:-
78+
|server.properties, log4j.properties and tools-log4j.properties.""".stripMargin)
79+
80+
setupParser.addArgument("--final-configs-dir", "-F").
81+
action(store()).
82+
required(true).
83+
help(
84+
"""Directory which holds final properties. It holds the final properties that will be used to boot kafka.
85+
|""".stripMargin)
86+
87+
parser.parseArgsOrFail(args)
88+
}
89+
90+
private def formatStorageCmd(configsPath: Path, env: Map[String, String]): Array[String] = {
91+
Array("format", "--cluster-id=" + env.get("CLUSTER_ID"), "-c", s"${configsPath.toString}/server.properties")
92+
}
93+
94+
private def prepareConfigs(defaultConfigsPath: Path, mountedConfigsPath: Path, finalConfigsPath: Path): Unit = {
95+
prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
96+
prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
97+
prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars)
98+
}
99+
100+
private[docker] def prepareServerConfigs(defaultConfigsPath: Path,
101+
mountedConfigsPath: Path,
102+
finalConfigsPath: Path,
103+
env: Map[String, String]): Unit = {
104+
val propsToAdd = addNewlinePadding(getServerConfigsFromEnv(env).mkString(NewlineChar))
105+
106+
val defaultFilePath = defaultConfigsPath.resolve(s"$ServerPropsFilename")
107+
val mountedFilePath = mountedConfigsPath.resolve(s"$ServerPropsFilename")
108+
val finalFilePath = finalConfigsPath.resolve(s"$ServerPropsFilename")
109+
110+
if (Files.exists(mountedFilePath)) {
111+
copyFile(mountedFilePath, finalFilePath)
112+
addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND)
113+
} else {
114+
addToFile(propsToAdd, finalFilePath, StandardOpenOption.TRUNCATE_EXISTING)
115+
}
116+
117+
val source = scala.io.Source.fromFile(finalFilePath.toString)
118+
val data = try source.mkString finally source.close()
119+
if (data.trim.isEmpty) {
120+
copyFile(defaultFilePath, finalFilePath)
121+
}
122+
}
123+
124+
private[docker] def prepareLog4jConfigs(defaultConfigsPath: Path,
125+
mountedConfigsPath: Path,
126+
finalConfigsPath: Path,
127+
env: Map[String, String]): Unit = {
128+
val propsToAdd = getLog4jConfigsFromEnv(env)
129+
130+
val defaultFilePath = defaultConfigsPath.resolve(s"$Log4jPropsFilename")
131+
val mountedFilePath = mountedConfigsPath.resolve(s"$Log4jPropsFilename")
132+
val finalFilePath = finalConfigsPath.resolve(s"$Log4jPropsFilename")
133+
134+
copyFile(defaultFilePath, finalFilePath)
135+
copyFile(mountedFilePath, finalFilePath)
136+
137+
addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND)
138+
}
139+
140+
private[docker] def prepareToolsLog4jConfigs(defaultConfigsPath: Path,
141+
mountedConfigsPath: Path,
142+
finalConfigsPath: Path,
143+
env: Map[String, String]): Unit = {
144+
val propToAdd = getToolsLog4jConfigsFromEnv(env)
145+
146+
val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4jFilename")
147+
val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4jFilename")
148+
val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4jFilename")
149+
150+
copyFile(defaultFilePath, finalFilePath)
151+
copyFile(mountedFilePath, finalFilePath)
152+
153+
addToFile(propToAdd, finalFilePath, StandardOpenOption.APPEND)
154+
}
155+
156+
private[docker] def getServerConfigsFromEnv(env: Map[String, String]): List[String] = {
157+
env.map {
158+
case (key, value) =>
159+
if (key.startsWith("KAFKA_") && !ExcludeServerPropsEnv.contains(key)) {
160+
val final_key = key.replace("KAFKA_", "").toLowerCase()
161+
.replace("_", ".")
162+
.replace("...", "-")
163+
.replace("..", "_")
164+
final_key + "=" + value
165+
} else {
166+
""
167+
}
168+
}
169+
.toList
170+
.filterNot(_.trim.isEmpty)
171+
}
172+
173+
private[docker] def getLog4jConfigsFromEnv(env: Map[String, String]): String = {
174+
val kafkaLog4jRootLogLevelProp = env.get(KafkaLog4jRootLoglevelEnv)
175+
.filter(_.nonEmpty)
176+
.map(kafkaLog4jRootLogLevel => s"log4j.rootLogger=$kafkaLog4jRootLogLevel, stdout")
177+
.getOrElse("")
178+
179+
val kafkaLog4jLoggersProp = env.get(KafkaLog4JLoggersEnv)
180+
.filter(_.nonEmpty)
181+
.map {
182+
kafkaLog4JLoggersString =>
183+
kafkaLog4JLoggersString.split(",")
184+
.map(kafkaLog4JLogger => s"log4j.logger.$kafkaLog4JLogger")
185+
.mkString(NewlineChar)
186+
}.getOrElse("")
187+
188+
addNewlinePadding(kafkaLog4jRootLogLevelProp) + addNewlinePadding(kafkaLog4jLoggersProp)
189+
}
190+
191+
private[docker] def getToolsLog4jConfigsFromEnv(env: Map[String, String]): String = {
192+
env.get(KafkaToolsLog4jLoglevelEnv)
193+
.filter(_.nonEmpty)
194+
.map(kafkaToolsLog4jLogLevel => addNewlinePadding(s"log4j.rootLogger=$kafkaToolsLog4jLogLevel, stderr"))
195+
.getOrElse("")
196+
}
197+
198+
private def addToFile(properties: String, filepath: Path, mode: StandardOpenOption): Unit = {
199+
val path = filepath
200+
if (!Files.exists(path)) {
201+
Files.createFile(path)
202+
}
203+
Files.write(filepath, properties.getBytes(StandardCharsets.UTF_8), mode)
204+
}
205+
206+
private def copyFile(source: Path, destination: Path) = {
207+
if (Files.exists(source)) {
208+
Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING)
209+
}
210+
}
211+
212+
private def addNewlinePadding(str: String): String = {
213+
if (str.nonEmpty) {
214+
NewlineChar + str
215+
} else {
216+
""
217+
}
218+
}
219+
220+
private def envVars: Map[String, String] = sys.env
221+
}
222+
223+
private object Constants {
224+
val ServerPropsFilename = "server.properties"
225+
val Log4jPropsFilename = "log4j.properties"
226+
val ToolsLog4jFilename = "tools-log4j.properties"
227+
val KafkaLog4JLoggersEnv = "KAFKA_LOG4J_LOGGERS"
228+
val KafkaLog4jRootLoglevelEnv = "KAFKA_LOG4J_ROOT_LOGLEVEL"
229+
val KafkaToolsLog4jLoglevelEnv = "KAFKA_TOOLS_LOG4J_LOGLEVEL"
230+
val ExcludeServerPropsEnv: Set[String] = Set(
231+
"KAFKA_VERSION",
232+
"KAFKA_HEAP_OPT",
233+
"KAFKA_LOG4J_OPTS",
234+
"KAFKA_OPTS",
235+
"KAFKA_JMX_OPTS",
236+
"KAFKA_JVM_PERFORMANCE_OPTS",
237+
"KAFKA_GC_LOG_OPTS",
238+
"KAFKA_LOG4J_ROOT_LOGLEVEL",
239+
"KAFKA_LOG4J_LOGGERS",
240+
"KAFKA_TOOLS_LOG4J_LOGLEVEL",
241+
"KAFKA_JMX_HOSTNAME")
242+
val NewlineChar = "\n"
243+
}

0 commit comments

Comments
 (0)