Skip to content

Commit 179dc13

Browse files
author
Jason White
committed
Merge pull request #9 from JasonMWhite/streaming_listener
adding streaming listener
2 parents a6702b6 + 892d632 commit 179dc13

File tree

4 files changed

+66
-19
lines changed

4 files changed

+66
-19
lines changed

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<groupId>com.shopify</groupId>
66
<artifactId>spark-datadog-relay</artifactId>
77
<packaging>jar</packaging>
8-
<version>0.2</version>
8+
<version>0.3</version>
99
<name>Datadog Relay for Spark</name>
1010

1111
<properties>
@@ -47,6 +47,11 @@
4747
<version>2.2.4</version>
4848
<scope>test</scope>
4949
</dependency>
50+
<dependency>
51+
<groupId>org.apache.spark</groupId>
52+
<artifactId>spark-streaming_2.10</artifactId>
53+
<version>1.5.1</version>
54+
</dependency>
5055
</dependencies>
5156

5257
<build>

src/main/scala/org/apache/spark/DatadogRelay.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
1818

1919
class DatadogRelay(conf: SparkConf) extends SparkFirehoseListener {
2020

21-
val tags: List[String] = {
22-
val datadogTags = conf.get("spark.datadog.tags", "")
23-
if (datadogTags == "") List() else datadogTags.split(",").toList
24-
}
25-
26-
val statsdOption: Option[NonBlockingStatsDClient] = {
27-
try {
28-
Some(new NonBlockingStatsDClient(
29-
"spark",
30-
"localhost",
31-
8125,
32-
tags.mkString(",")
33-
))
34-
} catch {
35-
case ex: StatsDClientException => None
36-
case ex: Exception => throw ex
37-
}
38-
}
21+
val statsdOption: Option[NonBlockingStatsDClient] = SparkStatsDHelper.getClient(conf)
3922

4023
def taskBaseMetrics(statsd: NonBlockingStatsDClient, e: SparkListenerTaskEnd): Unit = {
4124
statsd.incrementCounter("firehose.taskEnded")
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.apache.spark
2+
3+
import org.apache.spark.streaming.scheduler._
4+
import com.timgroup.statsd.{NonBlockingStatsDClient, StatsDClientException}
5+
6+
class DatadogStreamingRelay(conf: SparkConf) extends StreamingListener {
7+
8+
val statsdOption: Option[NonBlockingStatsDClient] = SparkStatsDHelper.getClient(conf)
9+
10+
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
11+
statsdOption.foreach { statsd =>
12+
statsd.incrementCounter("firehose.batchStarted")
13+
}
14+
}
15+
16+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
17+
statsdOption.foreach { statsd =>
18+
statsd.incrementCounter("firehose.batchCompleted")
19+
statsd.count("firehose.batchRecordsProcessed", batchCompleted.batchInfo.numRecords)
20+
batchCompleted.batchInfo.processingDelay.foreach { delay =>
21+
statsd.recordExecutionTime("firehose.batchProcessingDelay", delay)
22+
}
23+
batchCompleted.batchInfo.schedulingDelay.foreach { delay =>
24+
statsd.recordExecutionTime("firehose.batchSchedulingDelay", delay)
25+
}
26+
27+
batchCompleted.batchInfo.processingDelay.foreach { delay =>
28+
statsd.recordExecutionTime("firehose.batchProcessingDelay", delay)
29+
}
30+
batchCompleted.batchInfo.totalDelay.foreach { delay =>
31+
statsd.recordExecutionTime("firehose.batchTotalDelay", delay)
32+
}
33+
}
34+
}
35+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.apache.spark
2+
3+
import com.timgroup.statsd.{NonBlockingStatsDClient, StatsDClientException}
4+
5+
object SparkStatsDHelper {
6+
def tags(conf: SparkConf): List[String] = {
7+
val datadogTags = conf.get("spark.datadog.tags", "")
8+
if (datadogTags == "") List() else datadogTags.split(",").toList
9+
}
10+
11+
def getClient(conf: SparkConf): Option[NonBlockingStatsDClient] = {
12+
try {
13+
Some(new NonBlockingStatsDClient(
14+
"spark",
15+
"localhost",
16+
8125,
17+
tags(conf).mkString(",")
18+
))
19+
} catch {
20+
case ex: StatsDClientException => None
21+
case ex: Exception => throw ex
22+
}
23+
}
24+
}

0 commit comments

Comments
 (0)