Skip to content

Support datadog metrics submission v2 endpoint #1391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
7 changes: 6 additions & 1 deletion reporters/kamon-datadog/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ kamon {
#
api {

# API endpoint to which metrics time series data will be posted.
# Version of the API to use to submit metrics.
# More info: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
version = "v1"

# Metrics submission API endpoint to which metrics time series data will be posted.
# More info: https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
api-url = "https://app.datadoghq.com/api/v1/series"

# Datadog API key to use to send metrics to Datadog directly over HTTPS. The API key will be combined with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package kamon.datadog

import java.lang.StringBuilder
import java.nio.charset.StandardCharsets
import java.text.{DecimalFormat, DecimalFormatSymbols}
import java.time.Duration
Expand All @@ -26,25 +25,23 @@ import kamon.metric.MeasurementUnit.Dimension.{Information, Time}
import kamon.metric.{MeasurementUnit, MetricSnapshot, PeriodSnapshot}
import kamon.tag.{Tag, TagSet}
import kamon.util.{EnvironmentTags, Filter}
import kamon.{Kamon, module}
import kamon.Kamon
import kamon.datadog.DatadogAPIReporter.Configuration
import kamon.module.{MetricReporter, ModuleFactory}
import org.slf4j.LoggerFactory
import org.slf4j.event.Level

import scala.collection.JavaConverters._
import scala.util.{Failure, Success}

class DatadogAPIReporterFactory extends ModuleFactory {
override def create(settings: ModuleFactory.Settings): DatadogAPIReporter = {
val config = DatadogAPIReporter.readConfiguration(settings.config)
new DatadogAPIReporter(config, new HttpClient(config.httpConfig, usingAgent = false))
new DatadogAPIReporter(config)
}
}

class DatadogAPIReporter(
@volatile private var configuration: Configuration,
@volatile private var httpClient: HttpClient
) extends MetricReporter {
class DatadogAPIReporter(@volatile private var configuration: Configuration) extends MetricReporter {
import DatadogAPIReporter._

private val logger = LoggerFactory.getLogger(classOf[DatadogAPIReporter])
Expand All @@ -60,13 +57,11 @@ class DatadogAPIReporter(
}

override def reconfigure(config: Config): Unit = {
val newConfiguration = readConfiguration(config)
configuration = newConfiguration
httpClient = new HttpClient(configuration.httpConfig, usingAgent = false)
configuration = readConfiguration(config)
}

override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match {
configuration.httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match {
case Failure(e) =>
logger.logAtLevel(configuration.failureLogLevel, e.getMessage)
case Success(response) =>
Expand All @@ -79,56 +74,100 @@ class DatadogAPIReporter(

val host = Kamon.environment.host
val interval = Math.round(Duration.between(snapshot.from, snapshot.to).toMillis() / 1000d)
val seriesBuilder = new StringBuilder()

val payloadBuilder = new StringBuilder()

val apiVersion = configuration.apiVersion

@inline
def doubleToPercentileString(double: Double) = {
if (double == double.toLong) f"${double.toLong}%d"
else f"$double%s"
}

@inline
def metricTypeJsonPart(metricTypeStr: String) = {
if (apiVersion == "v1") {
s""""type":"$metricTypeStr""""
}
// v2 requires an enum type in the payload instead of the string name
else {
if (metricTypeStr == countMetricType) {
""""type":1"""
} else if (metricTypeStr == gaugeMetricType) {
""""type":3"""
} else {
// This reporter currently only supports counter and gauges.
// `0` is an undefined metric type in Datadog API.
""""type":0"""
}
}
}

lazy val hostJsonPart = {
if (apiVersion == "v1") {
s""""host":"$host""""
}
// v2 has a "resources" array field where "host" should be defined
else {
s""""resources":[{"name":"$host","type":"host"}]"""
}
}

def addDistribution(metric: MetricSnapshot.Distributions): Unit = {
val unit = metric.settings.unit
metric.instruments.foreach { d =>
val dist = d.value

val average = if (dist.count > 0L) (dist.sum / dist.count) else 0L
addMetric(metric.name + ".avg", valueFormat.format(scale(average, unit)), gauge, d.tags)
addMetric(metric.name + ".count", valueFormat.format(dist.count), count, d.tags)
addMetric(metric.name + ".median", valueFormat.format(scale(dist.percentile(50d).value, unit)), gauge, d.tags)
addMetric(metric.name + ".avg", valueFormat.format(scale(average, unit)), gaugeMetricType, d.tags)
addMetric(metric.name + ".count", valueFormat.format(dist.count), countMetricType, d.tags)
addMetric(
metric.name + ".median",
valueFormat.format(scale(dist.percentile(50d).value, unit)),
gaugeMetricType,
d.tags
)
configuration.percentiles.foreach { p =>
addMetric(
metric.name + s".${doubleToPercentileString(p)}percentile",
valueFormat.format(scale(dist.percentile(p).value, unit)),
gauge,
gaugeMetricType,
d.tags
)
}
addMetric(metric.name + ".max", valueFormat.format(scale(dist.max, unit)), gauge, d.tags)
addMetric(metric.name + ".min", valueFormat.format(scale(dist.min, unit)), gauge, d.tags)
addMetric(metric.name + ".max", valueFormat.format(scale(dist.max, unit)), gaugeMetricType, d.tags)
addMetric(metric.name + ".min", valueFormat.format(scale(dist.min, unit)), gaugeMetricType, d.tags)
}
}

def addMetric(metricName: String, value: String, metricType: String, tags: TagSet): Unit = {
val customTags = (configuration.extraTags ++ tags.iterator(_.toString).map(p => p.key -> p.value).filter(t =>
configuration.tagFilter.accept(t._1)
)).map { case (k, v) ⇒ quote"$k:$v" }

val allTagsString = customTags.mkString("[", ",", "]")

if (seriesBuilder.length() > 0) seriesBuilder.append(",")
if (payloadBuilder.nonEmpty) payloadBuilder.append(",")

seriesBuilder
.append(
s"""{"metric":"$metricName","interval":$interval,"points":[[$timestamp,$value]],"type":"$metricType","host":"$host","tags":$allTagsString}"""
)
val point = if (apiVersion == "v1") {
s"[$timestamp,$value]"
} else {
s"""{"timestamp":$timestamp,"value":$value}"""
}

payloadBuilder
.append(s"""{"metric":"$metricName",${metricTypeJsonPart(
metricType
)},"interval":$interval,"points":[$point],"tags":$allTagsString,$hostJsonPart}""".stripMargin)
}

snapshot.counters.foreach { snap =>
snap.instruments.foreach { instrument =>
addMetric(
snap.name,
valueFormat.format(scale(instrument.value, snap.settings.unit)),
count,
countMetricType,
instrument.tags
)
}
Expand All @@ -138,19 +177,18 @@ class DatadogAPIReporter(
addMetric(
snap.name,
valueFormat.format(scale(instrument.value, snap.settings.unit)),
gauge,
gaugeMetricType,
instrument.tags
)
}
}

(snapshot.histograms ++ snapshot.rangeSamplers ++ snapshot.timers).foreach(addDistribution)

seriesBuilder
payloadBuilder
.insert(0, "{\"series\":[")
.append("]}")
.toString()
.getBytes(StandardCharsets.UTF_8)
.toString.getBytes(StandardCharsets.UTF_8)

}

Expand All @@ -166,11 +204,42 @@ class DatadogAPIReporter(
}

private object DatadogAPIReporter {
val count = "count"
val gauge = "gauge"
val countMetricType = "count"
val gaugeMetricType = "gauge"

private def configureHttpClient(config: Config): (String, HttpClient) = {
val baseClient = buildHttpClient(config)

val apiVersion = {
val v = config.getString("version")
if (v != "v1" && v != "v2") {
sys.error(s"Invalid Datadog API version, the possible values are [v1, v2].")
}
v
}

val apiKey = config.getString("api-key")
val apiUrl = {
val url = config.getString("api-url")

if (apiVersion == "v1") {
url + "?api_key=" + apiKey
} else {
url
}
}
val headers = {
if (apiVersion == "v2") {
List("DD-API-KEY" -> apiKey)
} else List.empty
}

(apiVersion, baseClient.copy(endpoint = apiUrl, headers = headers))
}

case class Configuration(
httpConfig: Config,
httpClient: HttpClient,
apiVersion: String,
percentiles: Set[Double],
timeUnit: MeasurementUnit,
informationUnit: MeasurementUnit,
Expand All @@ -185,6 +254,8 @@ private object DatadogAPIReporter {

def readConfiguration(config: Config): Configuration = {
val datadogConfig = config.getConfig("kamon.datadog")
val httpConfig = datadogConfig.getConfig("api")
val (apiVersion, httpClient) = configureHttpClient(httpConfig)

// Remove the "host" tag since it gets added to the datadog payload separately
val extraTags = EnvironmentTags
Expand All @@ -194,7 +265,8 @@ private object DatadogAPIReporter {
.map(p => p.key -> Tag.unwrapValue(p).toString)

Configuration(
datadogConfig.getConfig("api"),
httpClient = httpClient,
apiVersion = apiVersion,
percentiles = datadogConfig.getDoubleList("percentiles").asScala.toList.map(_.toDouble).toSet,
timeUnit = readTimeUnit(datadogConfig.getString("time-unit")),
informationUnit = readInformationUnit(datadogConfig.getString("information-unit")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object DatadogSpanReporter {

Configuration(
getTranslator(config),
new HttpClient(config.getConfig(DatadogSpanReporter.httpConfigPath), usingAgent = true),
buildHttpClient(config.getConfig(DatadogSpanReporter.httpConfigPath)),
Kamon.filter("kamon.datadog.environment-tags.filter"),
EnvironmentTags.from(Kamon.environment, config.getConfig("kamon.datadog.environment-tags")).without("service")
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kamon.datadog

import okhttp3.{MediaType, OkHttpClient, Request, RequestBody, Response}

import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

private[datadog] case class HttpClient(
endpoint: String,
headers: List[(String, String)],
usingCompression: Boolean,
connectTimeout: Duration,
readTimeout: Duration,
writeTimeout: Duration,
retries: Int,
initRetryDelay: Duration
) {

private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504)

private lazy val httpClient: OkHttpClient = {
// Apparently okhttp doesn't require explicit closing of the connection
val builder = new OkHttpClient.Builder()
.connectTimeout(connectTimeout.toMillis, TimeUnit.MILLISECONDS)
.readTimeout(readTimeout.toMillis, TimeUnit.MILLISECONDS)
.writeTimeout(writeTimeout.toMillis, TimeUnit.MILLISECONDS)
.retryOnConnectionFailure(true)

if (usingCompression) builder.addInterceptor(new DeflateInterceptor).build()
else builder.build()
}

@tailrec
private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = {
// Try executing the request
val responseAttempt = Try(httpClient.newCall(request).execute())

if (attempt >= retries - 1) {
responseAttempt
} else {
responseAttempt match {
// If the request succeeded but with a retryable HTTP status code.
case Success(response) if retryableStatusCodes.contains(response.code) =>
response.close()
Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong)
doRequestWithRetries(request, attempt + 1)

// Either the request succeeded with an HTTP status not included in `retryableStatusCodes`
// or we have an unknown failure
case _ =>
responseAttempt
}
}
}

private def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = {
val body = RequestBody.create(contentBody, MediaType.parse(contentType))
val request = {
val builder = new Request.Builder().url(endpoint).method(method, body)
headers.foreach {
case (name, value) => builder.header(name, value)
}
builder.build()
}

doRequestWithRetries(request) match {
case Success(response) =>
val responseBody = response.body().string()
response.close()
if (response.isSuccessful) {
Success(responseBody)
} else {
Failure(new Exception(
s"Failed to $method metrics to Datadog with status code [${response.code()}], Body: [$responseBody]"
))
}
case Failure(f) if f.getCause != null =>
Failure(f.getCause)
case f @ Failure(_) =>
f.asInstanceOf[Try[String]]
}
}

def doPost(contentType: String, contentBody: Array[Byte]): Try[String] = {
doMethodWithBody("POST", contentType, contentBody)
}

def doJsonPut(contentBody: String): Try[String] = {
// Datadog Agent does not accept ";charset=UTF-8", using bytes to send Json posts
doMethodWithBody("PUT", "application/json", contentBody.getBytes(StandardCharsets.UTF_8))
}
}
Loading