Skip to content

Commit c7feea8

Browse files
authored
Pass kafka parameters through (#76)
* Pass kafka parameters through * Add comment * Add docs
1 parent 9436a15 commit c7feea8

File tree

3 files changed

+17
-4
lines changed

3 files changed

+17
-4
lines changed

docs/Flight_recorder_mode_KafkaSink.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ Configuration - KafkaSink parameters:
3636
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
3737
Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo
3838
Note: the topic will be created if it does not yet exist
39+
--conf spark.sparkmeasure.kafka.* = Other kafka properties
40+
Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
3941
```
4042

4143
This code depends on "kafka-clients". If you deploy sparkMeasure from maven central,

src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import scala.util.Try
2727
* example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
2828
* spark.sparkmeasure.kafkaTopic = Kafka topic
2929
* example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo
30+
* spark.sparkmeasure.kafka.* = Other kafka properties
31+
* example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
3032
*
3133
* This code depends on "kafka clients", you may need to add the dependency:
3234
* --packages org.apache.kafka:kafka-clients:3.2.1
@@ -39,7 +41,7 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
3941
logger.warn("Custom monitoring listener with Kafka sink initializing. Now attempting to connect to Kafka topic")
4042

4143
// Initialize Kafka connection
42-
val (broker, topic) = Utils.parseKafkaConfig(conf, logger)
44+
val (broker, topic, properties) = Utils.parseKafkaConfig(conf, logger)
4345
private var producer: Producer[String, Array[Byte]] = _
4446

4547
var appId: String = SparkSession.getActiveSession match {
@@ -248,6 +250,7 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
248250
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
249251
props.put("value.serializer", classOf[ByteArraySerializer].getName)
250252
props.put("client.id", "spark-measure")
253+
properties.foreach{ case (k, v) => props.put(k, v) }
251254
producer = new KafkaProducer(props)
252255
}
253256
)
@@ -343,4 +346,4 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) {
343346
)
344347
report(point2)
345348
}
346-
}
349+
}

src/main/scala/ch/cern/sparkmeasure/Utils.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ object Utils {
245245
influxdbStagemetrics
246246
}
247247

248-
def parseKafkaConfig(conf: SparkConf, logger: Logger): (String, String) = {
248+
def parseKafkaConfig(conf: SparkConf, logger: Logger): (String, String, Map[String, String]) = {
249249
// handle Kafka broker and topic
250250
val broker = conf.get("spark.sparkmeasure.kafkaBroker", "")
251251
val topic = conf.get("spark.sparkmeasure.kafkaTopic", "")
@@ -255,7 +255,15 @@ object Utils {
255255
logger.info(s"Kafka broker: $broker")
256256
logger.info(s"Kafka topic: $topic")
257257
}
258-
(broker, topic)
258+
259+
val prefix = "spark.sparkmeasure.kafka."
260+
val kafkaParams: Map[String, String] = conf.getAll
261+
.collect {
262+
case (k, v) if k.startsWith(prefix) => k.stripPrefix(prefix) -> v
263+
}
264+
.toMap
265+
266+
(broker, topic, kafkaParams)
259267
}
260268

261269
def parsePushGatewayConfig(conf: SparkConf, logger: Logger): PushgatewayConfig = {

0 commit comments

Comments
 (0)